diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2022-03-13 02:28:10 +0100 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2022-03-13 02:28:10 +0100 |
commit | ed1647b9446f84d81d41e8e28ccf063eff97b2f7 (patch) | |
tree | 7f22929aca611955ea129dc0afee839bb63872bf /nDPId.c | |
parent | dd35d9da3fd43f1091b8ec496ec25d72e54d8e22 (diff) |
Disconnect nDPIsrvd clients immediately instead waiting for a failed write().
* nDPIsrvd: Collector/Distributor logging improved
* nDPIsrvd: Command line option for max remote descriptors
* nDPId: Stop spamming nDPIsrvd Collector with the same events over and over again
* nDPId: Refactored some variable names and events
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r-- | nDPId.c | 380 |
1 files changed, 244 insertions, 136 deletions
@@ -46,8 +46,8 @@ #define DLT_DSA_TAG_EDSA 285 #endif -#if ((NDPI_MAJOR == 3 && NDPI_MINOR < 5) || NDPI_MAJOR < 3) && NDPI_API_VERSION < 4087 -#error "nDPI >= 3.5.0 or API version >= 4087 required" +#if ((NDPI_MAJOR == 4 && NDPI_MINOR < 4) || NDPI_MAJOR < 4) && NDPI_API_VERSION < 5892 +#error "nDPI >= 4.4.0 or API version >= 5892 required" #endif #if !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) || !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_8) @@ -122,7 +122,7 @@ struct nDPId_flow_extended { struct nDPId_flow_basic flow_basic; - unsigned int long long flow_id; + unsigned long long int flow_id; uint16_t min_l4_payload_len; uint16_t max_l4_payload_len; @@ -190,8 +190,13 @@ struct nDPId_workflow { pcap_t * pcap_handle; - uint16_t error_or_eof; - uint16_t is_pcap_file; + uint8_t error_or_eof; + uint8_t is_pcap_file; + + uint8_t max_flow_to_track_reached : 1; + uint8_t flow_allocation_already_failed : 1; + + uint8_t reserved_00; unsigned long long int packets_captured; unsigned long long int packets_processed; @@ -235,9 +240,9 @@ struct nDPId_reader_thread { struct nDPId_workflow * workflow; pthread_t thread_id; - int json_sockfd; - int json_sock_reconnect; - int array_index; + int collector_sockfd; + int collector_sock_reconnect; + size_t array_index; }; enum packet_event @@ -386,7 +391,7 @@ static struct char * custom_categories_file; char * custom_ja3_file; char * custom_sha1_file; - char json_sockpath[UNIX_PATH_MAX]; + char collector_sockpath[UNIX_PATH_MAX]; #ifdef ENABLE_ZLIB uint8_t enable_zlib_compression; #endif @@ -414,7 +419,7 @@ static struct unsigned long long int max_packets_per_flow_to_process; } nDPId_options = {.pidfile = nDPId_PIDFILE, .user = "nobody", - .json_sockpath = COLLECTOR_UNIX_SOCKET, + .collector_sockpath = COLLECTOR_UNIX_SOCKET, .max_flows_per_thread = nDPId_MAX_FLOWS_PER_THREAD / 2, .max_idle_flows_per_thread = nDPId_MAX_IDLE_FLOWS_PER_THREAD / 2, .tick_resolution = nDPId_TICK_RESOLUTION, @@ -1188,7 +1193,7 @@ static void log_flows(struct nDPId_reader_thread const * const reader_thread) } logger(0, - "MemoryProfiler flow stats: [thread: %d][unknown: %llu][skipped: %llu][finished: %llu][info: %llu]", + "MemoryProfiler flow stats: [thread: %zu][unknown: %llu][skipped: %llu][finished: %llu][info: %llu]", reader_thread->array_index, log_user_data.flows_ukn, log_user_data.flows_skp, @@ -1199,7 +1204,7 @@ static void log_flows(struct nDPId_reader_thread const * const reader_thread) if (flows_log_str_used > 0) { logger(0, - "MemoryProfiler flows active (finished): [thread: %d][%.*s]", + "MemoryProfiler flows active (finished): [thread: %zu][%.*s]", reader_thread->array_index, (int)flows_log_str_used, flows_log_str); @@ -1209,7 +1214,7 @@ static void log_flows(struct nDPId_reader_thread const * const reader_thread) if (flows_log_str_used > 0) { logger(0, - "MemoryProfiler flows active (info): [thread: %d][%.*s]", + "MemoryProfiler flows active (info): [thread: %zu][%.*s]", reader_thread->array_index, (int)flows_log_str_used, flows_log_str); @@ -1905,11 +1910,14 @@ static void jsonize_l3_l4(struct nDPId_workflow * const workflow, struct nDPId_f } } -static void jsonize_basic(struct nDPId_reader_thread * const reader_thread) +static void jsonize_basic(struct nDPId_reader_thread * const reader_thread, int serialize_thread_id) { struct nDPId_workflow * const workflow = reader_thread->workflow; - ndpi_serialize_string_int32(&workflow->ndpi_serializer, "thread_id", reader_thread->array_index); + if (serialize_thread_id != 0) + { + ndpi_serialize_string_int32(&workflow->ndpi_serializer, "thread_id", reader_thread->array_index); + } ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "packet_id", workflow->packets_captured); ndpi_serialize_string_string(&workflow->ndpi_serializer, "source", nDPId_options.pcap_file_or_interface); ndpi_serialize_string_string(&workflow->ndpi_serializer, "alias", nDPId_options.instance_alias); @@ -1935,7 +1943,7 @@ static void jsonize_daemon(struct nDPId_reader_thread * const reader_thread, enu ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, daemon_event_name_table[DAEMON_EVENT_INVALID]); } - jsonize_basic(reader_thread); + jsonize_basic(reader_thread, 1); switch (event) { @@ -2040,58 +2048,60 @@ static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_fl ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "thread_ts_msec", workflow->last_thread_time); } -static int connect_to_json_socket(struct nDPId_reader_thread * const reader_thread) +static int connect_to_collector(struct nDPId_reader_thread * const reader_thread) { struct sockaddr_un saddr; - if (reader_thread->json_sockfd >= 0) + if (reader_thread->collector_sockfd >= 0) { - close(reader_thread->json_sockfd); + close(reader_thread->collector_sockfd); } - reader_thread->json_sockfd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0); - if (reader_thread->json_sockfd < 0) + reader_thread->collector_sockfd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0); + if (reader_thread->collector_sockfd < 0) { - reader_thread->json_sock_reconnect = 1; + reader_thread->collector_sock_reconnect = 1; return 1; } int opt = NETWORK_BUFFER_MAX_SIZE * 16; - if (setsockopt(reader_thread->json_sockfd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)) < 0) + if (setsockopt(reader_thread->collector_sockfd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)) < 0) { return 1; } saddr.sun_family = AF_UNIX; - int written = snprintf(saddr.sun_path, sizeof(saddr.sun_path), "%s", nDPId_options.json_sockpath); + int written = snprintf(saddr.sun_path, sizeof(saddr.sun_path), "%s", nDPId_options.collector_sockpath); if (written < 0) { return 1; } - if (connect(reader_thread->json_sockfd, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) + if (connect(reader_thread->collector_sockfd, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) { - reader_thread->json_sock_reconnect = 1; + reader_thread->collector_sock_reconnect = 1; return 1; } - if (shutdown(reader_thread->json_sockfd, SHUT_RD) != 0) + if (shutdown(reader_thread->collector_sockfd, SHUT_RD) != 0) { return 1; } - if (fcntl(reader_thread->json_sockfd, F_SETFL, fcntl(reader_thread->json_sockfd, F_GETFL, 0) | O_NONBLOCK) == -1) + if (fcntl(reader_thread->collector_sockfd, + F_SETFL, + fcntl(reader_thread->collector_sockfd, F_GETFL, 0) | O_NONBLOCK) == -1) { - reader_thread->json_sock_reconnect = 1; + reader_thread->collector_sock_reconnect = 1; return 1; } - reader_thread->json_sock_reconnect = 0; + reader_thread->collector_sock_reconnect = 0; return 0; } -static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread, +static void send_to_collector(struct nDPId_reader_thread * const reader_thread, char const * const json_str, size_t json_str_len) { @@ -2110,7 +2120,7 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread, if (s_ret < 0 || s_ret == (int)sizeof(newline_json_str)) { logger(1, - "[%8llu, %d] JSON buffer prepare failed: snprintf returned %d, buffer size %zu", + "[%8llu, %zu] JSON buffer prepare failed: snprintf returned %d, buffer size %zu", workflow->packets_captured, reader_thread->array_index, s_ret, @@ -2118,12 +2128,12 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread, return; } - if (reader_thread->json_sock_reconnect != 0) + if (reader_thread->collector_sock_reconnect != 0) { - if (connect_to_json_socket(reader_thread) == 0) + if (connect_to_collector(reader_thread) == 0) { logger(1, - "[%8llu, %d] Reconnected to nDPIsrvd Collector", + "[%8llu, %zu] Reconnected to nDPIsrvd Collector", workflow->packets_captured, reader_thread->array_index); jsonize_daemon(reader_thread, DAEMON_EVENT_RECONNECT); @@ -2132,35 +2142,38 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread, errno = 0; ssize_t written; - if (reader_thread->json_sock_reconnect == 0 && - (written = write(reader_thread->json_sockfd, newline_json_str, s_ret)) != s_ret) + if (reader_thread->collector_sock_reconnect == 0 && + (written = write(reader_thread->collector_sockfd, newline_json_str, s_ret)) != s_ret) { saved_errno = errno; if (saved_errno == EPIPE) { logger(1, - "[%8llu, %d] Lost connection to nDPIsrvd Collector", + "[%8llu, %zu] Lost connection to nDPIsrvd Collector", workflow->packets_captured, reader_thread->array_index); } if (saved_errno != EAGAIN) { - reader_thread->json_sock_reconnect = 1; + reader_thread->collector_sock_reconnect = 1; } else { - fcntl(reader_thread->json_sockfd, F_SETFL, fcntl(reader_thread->json_sockfd, F_GETFL, 0) & ~O_NONBLOCK); + fcntl(reader_thread->collector_sockfd, + F_SETFL, + fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK); off_t pos = (written < 0 ? 0 : written); - while ((written = write(reader_thread->json_sockfd, newline_json_str + pos, s_ret - pos)) != s_ret - pos) + while ((written = write(reader_thread->collector_sockfd, newline_json_str + pos, s_ret - pos)) != + s_ret - pos) { if (written < 0) { logger(1, - "[%8llu, %d] Send data (blocking I/O) to nDPIsrvd Collector failed: %s", + "[%8llu, %zu] Send data (blocking I/O) to nDPIsrvd Collector failed: %s", workflow->packets_captured, reader_thread->array_index, strerror(saved_errno)); - reader_thread->json_sock_reconnect = 1; + reader_thread->collector_sock_reconnect = 1; break; } else @@ -2168,7 +2181,9 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread, pos += written; } } - fcntl(reader_thread->json_sockfd, F_SETFL, fcntl(reader_thread->json_sockfd, F_GETFL, 0) & O_NONBLOCK); + fcntl(reader_thread->collector_sockfd, + F_SETFL, + fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & O_NONBLOCK); } } } @@ -2182,7 +2197,7 @@ static void serialize_and_send(struct nDPId_reader_thread * const reader_thread) if (json_str == NULL || json_str_len == 0) { logger(1, - "[%8llu, %d] jsonize failed, buffer length: %u", + "[%8llu, %zu] jsonize failed, buffer length: %u", reader_thread->workflow->packets_captured, reader_thread->array_index, json_str_len); @@ -2190,7 +2205,7 @@ static void serialize_and_send(struct nDPId_reader_thread * const reader_thread) else { reader_thread->workflow->total_events_serialized++; - send_to_json_sink(reader_thread, json_str, json_str_len); + send_to_collector(reader_thread, json_str, json_str_len); } ndpi_reset_serializer(&reader_thread->workflow->ndpi_serializer); } @@ -2319,7 +2334,7 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa if (flow_ext == NULL) { logger(1, - "[%8llu, %d] BUG: got a PACKET_EVENT_PAYLOAD_FLOW with a flow pointer equals NULL", + "[%8llu, %zu] BUG: got a PACKET_EVENT_PAYLOAD_FLOW with a flow pointer equals NULL", reader_thread->workflow->packets_captured, reader_thread->array_index); return; @@ -2340,7 +2355,7 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, packet_event_name_table[PACKET_EVENT_INVALID]); } - jsonize_basic(reader_thread); + jsonize_basic(reader_thread, (event == PACKET_EVENT_PAYLOAD_FLOW ? 1 : 0)); if (event == PACKET_EVENT_PAYLOAD_FLOW) { @@ -2370,7 +2385,7 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa if (ndpi_serialize_string_binary(&workflow->ndpi_serializer, "pkt", base64_data, base64_data_len) != 0) { logger(1, - "[%8llu, %d] JSON serializing base64 packet buffer failed", + "[%8llu, %zu] JSON serializing base64 packet buffer failed", reader_thread->workflow->packets_captured, reader_thread->array_index); } @@ -2378,7 +2393,7 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa else { logger(1, - "[%8llu, %d] Base64 encoding failed with: %s.", + "[%8llu, %zu] Base64 encoding failed with: %s.", reader_thread->workflow->packets_captured, reader_thread->array_index, base64_ret_strings[base64_retval]); @@ -2403,7 +2418,7 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, { ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, flow_event_name_table[FLOW_EVENT_INVALID]); } - jsonize_basic(reader_thread); + jsonize_basic(reader_thread, 1); jsonize_flow(workflow, flow_ext); jsonize_l3_l4(workflow, &flow_ext->flow_basic); @@ -2466,7 +2481,7 @@ static void jsonize_flow_detection_event(struct nDPId_reader_thread * const read { ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, flow_event_name_table[FLOW_EVENT_INVALID]); } - jsonize_basic(reader_thread); + jsonize_basic(reader_thread, 1); jsonize_flow(workflow, &flow_info->flow_extended); jsonize_l3_l4(workflow, &flow_info->flow_extended.flow_basic); @@ -2686,7 +2701,16 @@ __attribute__((format(printf, 3, 4))) static void jsonize_basic_eventf(struct nD "datalink", pcap_datalink(reader_thread->workflow->pcap_handle)); - jsonize_basic(reader_thread); + switch (event) + { + case MAX_FLOW_TO_TRACK: + case FLOW_MEMORY_ALLOCATION_FAILED: + jsonize_basic(reader_thread, 1); + break; + default: + jsonize_basic(reader_thread, 0); + break; + } if (format != NULL) { @@ -3143,10 +3167,12 @@ static void do_periodically_work(struct nDPId_reader_thread * const reader_threa reader_thread->workflow->last_scan_time = reader_thread->workflow->last_global_time; } if (reader_thread->workflow->last_status_time + nDPId_options.daemon_status_interval + - reader_thread->array_index * 1000 <= reader_thread->workflow->last_global_time) + reader_thread->array_index * 1000 <= + reader_thread->workflow->last_global_time) { jsonize_daemon(reader_thread, DAEMON_EVENT_STATUS); - reader_thread->workflow->last_status_time = reader_thread->workflow->last_global_time + reader_thread->array_index * 1000; + reader_thread->workflow->last_status_time = + reader_thread->workflow->last_global_time + reader_thread->array_index * 1000; } #ifdef ENABLE_MEMORY_PROFILING if (reader_thread->workflow->last_memory_usage_log_time + nDPId_options.memory_profiling_log_interval <= @@ -3159,6 +3185,12 @@ static void do_periodically_work(struct nDPId_reader_thread * const reader_threa #endif } +static int distribute_single_packet(struct nDPId_reader_thread * const reader_thread) +{ + return (reader_thread->workflow->packets_captured % nDPId_options.reader_thread_count == + reader_thread->array_index); +} + static void ndpi_process_packet(uint8_t * const args, struct pcap_pkthdr const * const header, uint8_t const * const packet) @@ -3185,7 +3217,7 @@ static void ndpi_process_packet(uint8_t * const args, uint16_t l4_payload_len = 0; uint16_t type = 0; - int thread_index = nDPId_THREAD_DISTRIBUTION_SEED; // generated with `dd if=/dev/random bs=1024 count=1 |& hd' + size_t thread_index = nDPId_THREAD_DISTRIBUTION_SEED; // generated with `dd if=/dev/random bs=1024 count=1 |& hd' if (reader_thread == NULL) { @@ -3225,8 +3257,11 @@ static void ndpi_process_packet(uint8_t * const args, } else { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); - jsonize_basic_eventf(reader_thread, UNKNOWN_L3_PROTOCOL, "%s%u", "protocol", type); + if (distribute_single_packet(reader_thread) != 0) + { + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + jsonize_basic_eventf(reader_thread, UNKNOWN_L3_PROTOCOL, "%s%u", "protocol", type); + } return; } ip_size = header->caplen - ip_offset; @@ -3235,14 +3270,17 @@ static void ndpi_process_packet(uint8_t * const args, { if (header->caplen < header->len) { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); - jsonize_basic_eventf(reader_thread, - CAPTURE_SIZE_SMALLER_THAN_PACKET_SIZE, - "%s%u %s%u", - "size", - header->caplen, - "expected", - header->len); + if (distribute_single_packet(reader_thread) != 0) + { + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + jsonize_basic_eventf(reader_thread, + CAPTURE_SIZE_SMALLER_THAN_PACKET_SIZE, + "%s%u %s%u", + "size", + header->caplen, + "expected", + header->len); + } } } @@ -3251,9 +3289,17 @@ static void ndpi_process_packet(uint8_t * const args, { if (ip_size < sizeof(*ip)) { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); - jsonize_basic_eventf( - reader_thread, IP4_SIZE_SMALLER_THAN_HEADER, "%s%u %s%zu", "size", ip_size, "expected", sizeof(*ip)); + if (distribute_single_packet(reader_thread) != 0) + { + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + jsonize_basic_eventf(reader_thread, + IP4_SIZE_SMALLER_THAN_HEADER, + "%s%u %s%zu", + "size", + ip_size, + "expected", + sizeof(*ip)); + } return; } @@ -3262,9 +3308,12 @@ static void ndpi_process_packet(uint8_t * const args, if (ndpi_detection_get_l4( (uint8_t *)ip, ip_size, &l4_ptr, &l4_len, &flow_basic.l4_protocol, NDPI_DETECTION_ONLY_IPV4) != 0) { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); - jsonize_basic_eventf( - reader_thread, IP4_L4_PAYLOAD_DETECTION_FAILED, "%s%zu", "l4_data_len", ip_size - sizeof(*ip)); + if (distribute_single_packet(reader_thread) != 0) + { + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + jsonize_basic_eventf( + reader_thread, IP4_L4_PAYLOAD_DETECTION_FAILED, "%s%zu", "l4_data_len", ip_size - sizeof(*ip)); + } return; } @@ -3277,14 +3326,17 @@ static void ndpi_process_packet(uint8_t * const args, { if (ip_size < sizeof(ip6->ip6_hdr)) { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); - jsonize_basic_eventf(reader_thread, - IP6_SIZE_SMALLER_THAN_HEADER, - "%s%u %s%zu", - "size", - ip_size, - "expected", - sizeof(ip6->ip6_hdr)); + if (distribute_single_packet(reader_thread) != 0) + { + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + jsonize_basic_eventf(reader_thread, + IP6_SIZE_SMALLER_THAN_HEADER, + "%s%u %s%zu", + "size", + ip_size, + "expected", + sizeof(ip6->ip6_hdr)); + } return; } @@ -3292,9 +3344,12 @@ static void ndpi_process_packet(uint8_t * const args, if (ndpi_detection_get_l4( (uint8_t *)ip6, ip_size, &l4_ptr, &l4_len, &flow_basic.l4_protocol, NDPI_DETECTION_ONLY_IPV6) != 0) { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); - jsonize_basic_eventf( - reader_thread, IP6_L4_PAYLOAD_DETECTION_FAILED, "%s%zu", "l4_data_len", ip_size - sizeof(*ip)); + if (distribute_single_packet(reader_thread) != 0) + { + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + jsonize_basic_eventf( + reader_thread, IP6_L4_PAYLOAD_DETECTION_FAILED, "%s%zu", "l4_data_len", ip_size - sizeof(*ip)); + } return; } @@ -3319,8 +3374,11 @@ static void ndpi_process_packet(uint8_t * const args, } else { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); - jsonize_basic_eventf(reader_thread, UNKNOWN_L3_PROTOCOL, "%s%u", "protocol", type); + if (distribute_single_packet(reader_thread) != 0) + { + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + jsonize_basic_eventf(reader_thread, UNKNOWN_L3_PROTOCOL, "%s%u", "protocol", type); + } return; } @@ -3331,15 +3389,25 @@ static void ndpi_process_packet(uint8_t * const args, if (header->caplen < (l4_ptr - packet) + sizeof(struct ndpi_tcphdr)) { - jsonize_packet_event( - reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD); - jsonize_basic_eventf(reader_thread, - TCP_PACKET_TOO_SHORT, - "%s%u %s%zu", - "size", - header->caplen, - "expected", - (l4_ptr - packet) + sizeof(struct ndpi_tcphdr)); + if (distribute_single_packet(reader_thread) != 0) + { + jsonize_packet_event(reader_thread, + header, + packet, + type, + ip_offset, + (l4_ptr - packet), + l4_len, + NULL, + PACKET_EVENT_PAYLOAD); + jsonize_basic_eventf(reader_thread, + TCP_PACKET_TOO_SHORT, + "%s%u %s%zu", + "size", + header->caplen, + "expected", + (l4_ptr - packet) + sizeof(struct ndpi_tcphdr)); + } return; } tcp = (struct ndpi_tcphdr *)l4_ptr; @@ -3355,15 +3423,25 @@ static void ndpi_process_packet(uint8_t * const args, if (header->caplen < (l4_ptr - packet) + sizeof(struct ndpi_udphdr)) { - jsonize_packet_event( - reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD); - jsonize_basic_eventf(reader_thread, - UDP_PACKET_TOO_SHORT, - "%s%u %s%zu", - "size", - header->caplen, - "expected", - (l4_ptr - packet) + sizeof(struct ndpi_udphdr)); + if (distribute_single_packet(reader_thread) != 0) + { + jsonize_packet_event(reader_thread, + header, + packet, + type, + ip_offset, + (l4_ptr - packet), + l4_len, + NULL, + PACKET_EVENT_PAYLOAD); + jsonize_basic_eventf(reader_thread, + UDP_PACKET_TOO_SHORT, + "%s%u %s%zu", + "size", + header->caplen, + "expected", + (l4_ptr - packet) + sizeof(struct ndpi_udphdr)); + } return; } udp = (struct ndpi_udphdr *)l4_ptr; @@ -3520,31 +3598,57 @@ static void ndpi_process_packet(uint8_t * const args, if (workflow->cur_active_flows == workflow->max_active_flows) { - jsonize_packet_event( - reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD); - jsonize_basic_eventf(reader_thread, - MAX_FLOW_TO_TRACK, - "%s%llu %s%llu %s%llu %s%llu", - "current_active", - workflow->cur_active_flows, - "current_idle", - workflow->cur_idle_flows, - "max_active", - workflow->max_active_flows, - "max_idle", - workflow->max_idle_flows); + if (workflow->max_flow_to_track_reached == 0) + { + workflow->max_flow_to_track_reached = 1; + + jsonize_packet_event(reader_thread, + header, + packet, + type, + ip_offset, + (l4_ptr - packet), + l4_len, + NULL, + PACKET_EVENT_PAYLOAD); + jsonize_basic_eventf(reader_thread, + MAX_FLOW_TO_TRACK, + "%s%llu %s%llu %s%llu %s%llu", + "current_active", + workflow->cur_active_flows, + "current_idle", + workflow->cur_idle_flows, + "max_active", + workflow->max_active_flows, + "max_idle", + workflow->max_idle_flows); + } return; } + workflow->max_flow_to_track_reached = 0; flow_to_process = (struct nDPId_flow_info *)add_new_flow(workflow, &flow_basic, FS_INFO, hashed_index); if (flow_to_process == NULL) { - jsonize_packet_event( - reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD); - jsonize_basic_eventf( - reader_thread, FLOW_MEMORY_ALLOCATION_FAILED, "%s%zu", "size", sizeof(*flow_to_process)); + if (workflow->flow_allocation_already_failed == 0) + { + workflow->flow_allocation_already_failed = 1; + + jsonize_packet_event(reader_thread, + header, + packet, + type, + ip_offset, + (l4_ptr - packet), + l4_len, + NULL, + PACKET_EVENT_PAYLOAD); + jsonize_basic_eventf( + reader_thread, FLOW_MEMORY_ALLOCATION_FAILED, "%s%zu", "size", sizeof(*flow_to_process)); + } return; } + workflow->flow_allocation_already_failed = 0; workflow->total_active_flows++; flow_to_process->flow_extended.flow_id = __sync_fetch_and_add(&global_flow_id, 1); @@ -3788,6 +3892,10 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) nready = epoll_wait(epoll_fd, events, events_size, timeout_ms); if (errno != 0) { + if (errno == EINTR) + { + continue; + } logger(1, "Epoll returned error: %s", strerror(errno)); __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); break; @@ -3841,16 +3949,16 @@ static void * processing_thread(void * const ndpi_thread_arg) { struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)ndpi_thread_arg; - reader_thread->json_sockfd = -1; - reader_thread->json_sock_reconnect = 1; + reader_thread->collector_sockfd = -1; + reader_thread->collector_sock_reconnect = 1; errno = 0; - if (connect_to_json_socket(reader_thread) != 0) + if (connect_to_collector(reader_thread) != 0) { logger(1, - "Thread %u: Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s", + "Thread %zu: Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s", reader_thread->array_index, - nDPId_options.json_sockpath, + nDPId_options.collector_sockpath, (errno != 0 ? strerror(errno) : "Internal Error.")); } else @@ -3859,7 +3967,7 @@ static void * processing_thread(void * const ndpi_thread_arg) } run_pcap_loop(reader_thread); - fcntl(reader_thread->json_sockfd, F_SETFL, fcntl(reader_thread->json_sockfd, F_GETFL, 0) & ~O_NONBLOCK); + fcntl(reader_thread->collector_sockfd, F_SETFL, fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK); __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); return NULL; } @@ -3982,13 +4090,13 @@ static void process_remaining_flows(void) { for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i) { - if (fcntl(reader_threads[i].json_sockfd, + if (fcntl(reader_threads[i].collector_sockfd, F_SETFL, - fcntl(reader_threads[i].json_sockfd, F_GETFL, 0) & ~O_NONBLOCK) == -1) + fcntl(reader_threads[i].collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK) == -1) { logger(1, "Could not set JSON fd %d to blocking mode for shutdown: %s", - reader_threads[i].json_sockfd, + reader_threads[i].collector_sockfd, strerror(errno)); } @@ -4060,7 +4168,7 @@ static int stop_reader_threads(void) total_flow_updates += reader_threads[i].workflow->total_flow_updates; printf( - "Stopping Thread %2d, processed %llu packets, %llu bytes\n" + "Stopping Thread %2zu, processed %llu packets, %llu bytes\n" "\tskipped flows.....: %8llu, processed flows: %8llu, idle flows....: %8llu\n" "\tnot detected flows: %8llu, guessed flows..: %8llu, detected flows: %8llu\n" "\tdetection updates.: %8llu, updated flows..: %8llu\n", @@ -4265,8 +4373,8 @@ static int nDPId_parse_options(int argc, char ** argv) } break; case 'c': - strncpy(nDPId_options.json_sockpath, optarg, sizeof(nDPId_options.json_sockpath) - 1); - nDPId_options.json_sockpath[sizeof(nDPId_options.json_sockpath) - 1] = '\0'; + strncpy(nDPId_options.collector_sockpath, optarg, sizeof(nDPId_options.collector_sockpath) - 1); + nDPId_options.collector_sockpath[sizeof(nDPId_options.collector_sockpath) - 1] = '\0'; break; case 'd': daemonize_enable(); @@ -4441,17 +4549,17 @@ static int validate_options(void) } } #endif - if (is_path_absolute("JSON socket", nDPId_options.json_sockpath) != 0) + if (is_path_absolute("Collector socket", nDPId_options.collector_sockpath) != 0) { retval = 1; } { struct sockaddr_un saddr; - if (strlen(nDPId_options.json_sockpath) >= sizeof(saddr.sun_path)) + if (strlen(nDPId_options.collector_sockpath) >= sizeof(saddr.sun_path)) { logger_early(1, - "JSON socket path too long, current/max: %zu/%zu", - strlen(nDPId_options.json_sockpath), + "Collector socket path too long, current/max: %zu/%zu", + strlen(nDPId_options.collector_sockpath), sizeof(saddr.sun_path) - 1); retval = 1; } |