summaryrefslogtreecommitdiff
path: root/nDPId.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2022-03-13 02:28:10 +0100
committerToni Uhlig <matzeton@googlemail.com>2022-03-13 02:28:10 +0100
commited1647b9446f84d81d41e8e28ccf063eff97b2f7 (patch)
tree7f22929aca611955ea129dc0afee839bb63872bf /nDPId.c
parentdd35d9da3fd43f1091b8ec496ec25d72e54d8e22 (diff)
Disconnect nDPIsrvd clients immediately instead waiting for a failed write().
* nDPIsrvd: Collector/Distributor logging improved * nDPIsrvd: Command line option for max remote descriptors * nDPId: Stop spamming nDPIsrvd Collector with the same events over and over again * nDPId: Refactored some variable names and events Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r--nDPId.c380
1 files changed, 244 insertions, 136 deletions
diff --git a/nDPId.c b/nDPId.c
index a7e41fb4a..a6c00642e 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -46,8 +46,8 @@
#define DLT_DSA_TAG_EDSA 285
#endif
-#if ((NDPI_MAJOR == 3 && NDPI_MINOR < 5) || NDPI_MAJOR < 3) && NDPI_API_VERSION < 4087
-#error "nDPI >= 3.5.0 or API version >= 4087 required"
+#if ((NDPI_MAJOR == 4 && NDPI_MINOR < 4) || NDPI_MAJOR < 4) && NDPI_API_VERSION < 5892
+#error "nDPI >= 4.4.0 or API version >= 5892 required"
#endif
#if !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) || !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_8)
@@ -122,7 +122,7 @@ struct nDPId_flow_extended
{
struct nDPId_flow_basic flow_basic;
- unsigned int long long flow_id;
+ unsigned long long int flow_id;
uint16_t min_l4_payload_len;
uint16_t max_l4_payload_len;
@@ -190,8 +190,13 @@ struct nDPId_workflow
{
pcap_t * pcap_handle;
- uint16_t error_or_eof;
- uint16_t is_pcap_file;
+ uint8_t error_or_eof;
+ uint8_t is_pcap_file;
+
+ uint8_t max_flow_to_track_reached : 1;
+ uint8_t flow_allocation_already_failed : 1;
+
+ uint8_t reserved_00;
unsigned long long int packets_captured;
unsigned long long int packets_processed;
@@ -235,9 +240,9 @@ struct nDPId_reader_thread
{
struct nDPId_workflow * workflow;
pthread_t thread_id;
- int json_sockfd;
- int json_sock_reconnect;
- int array_index;
+ int collector_sockfd;
+ int collector_sock_reconnect;
+ size_t array_index;
};
enum packet_event
@@ -386,7 +391,7 @@ static struct
char * custom_categories_file;
char * custom_ja3_file;
char * custom_sha1_file;
- char json_sockpath[UNIX_PATH_MAX];
+ char collector_sockpath[UNIX_PATH_MAX];
#ifdef ENABLE_ZLIB
uint8_t enable_zlib_compression;
#endif
@@ -414,7 +419,7 @@ static struct
unsigned long long int max_packets_per_flow_to_process;
} nDPId_options = {.pidfile = nDPId_PIDFILE,
.user = "nobody",
- .json_sockpath = COLLECTOR_UNIX_SOCKET,
+ .collector_sockpath = COLLECTOR_UNIX_SOCKET,
.max_flows_per_thread = nDPId_MAX_FLOWS_PER_THREAD / 2,
.max_idle_flows_per_thread = nDPId_MAX_IDLE_FLOWS_PER_THREAD / 2,
.tick_resolution = nDPId_TICK_RESOLUTION,
@@ -1188,7 +1193,7 @@ static void log_flows(struct nDPId_reader_thread const * const reader_thread)
}
logger(0,
- "MemoryProfiler flow stats: [thread: %d][unknown: %llu][skipped: %llu][finished: %llu][info: %llu]",
+ "MemoryProfiler flow stats: [thread: %zu][unknown: %llu][skipped: %llu][finished: %llu][info: %llu]",
reader_thread->array_index,
log_user_data.flows_ukn,
log_user_data.flows_skp,
@@ -1199,7 +1204,7 @@ static void log_flows(struct nDPId_reader_thread const * const reader_thread)
if (flows_log_str_used > 0)
{
logger(0,
- "MemoryProfiler flows active (finished): [thread: %d][%.*s]",
+ "MemoryProfiler flows active (finished): [thread: %zu][%.*s]",
reader_thread->array_index,
(int)flows_log_str_used,
flows_log_str);
@@ -1209,7 +1214,7 @@ static void log_flows(struct nDPId_reader_thread const * const reader_thread)
if (flows_log_str_used > 0)
{
logger(0,
- "MemoryProfiler flows active (info): [thread: %d][%.*s]",
+ "MemoryProfiler flows active (info): [thread: %zu][%.*s]",
reader_thread->array_index,
(int)flows_log_str_used,
flows_log_str);
@@ -1905,11 +1910,14 @@ static void jsonize_l3_l4(struct nDPId_workflow * const workflow, struct nDPId_f
}
}
-static void jsonize_basic(struct nDPId_reader_thread * const reader_thread)
+static void jsonize_basic(struct nDPId_reader_thread * const reader_thread, int serialize_thread_id)
{
struct nDPId_workflow * const workflow = reader_thread->workflow;
- ndpi_serialize_string_int32(&workflow->ndpi_serializer, "thread_id", reader_thread->array_index);
+ if (serialize_thread_id != 0)
+ {
+ ndpi_serialize_string_int32(&workflow->ndpi_serializer, "thread_id", reader_thread->array_index);
+ }
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "packet_id", workflow->packets_captured);
ndpi_serialize_string_string(&workflow->ndpi_serializer, "source", nDPId_options.pcap_file_or_interface);
ndpi_serialize_string_string(&workflow->ndpi_serializer, "alias", nDPId_options.instance_alias);
@@ -1935,7 +1943,7 @@ static void jsonize_daemon(struct nDPId_reader_thread * const reader_thread, enu
ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, daemon_event_name_table[DAEMON_EVENT_INVALID]);
}
- jsonize_basic(reader_thread);
+ jsonize_basic(reader_thread, 1);
switch (event)
{
@@ -2040,58 +2048,60 @@ static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_fl
ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "thread_ts_msec", workflow->last_thread_time);
}
-static int connect_to_json_socket(struct nDPId_reader_thread * const reader_thread)
+static int connect_to_collector(struct nDPId_reader_thread * const reader_thread)
{
struct sockaddr_un saddr;
- if (reader_thread->json_sockfd >= 0)
+ if (reader_thread->collector_sockfd >= 0)
{
- close(reader_thread->json_sockfd);
+ close(reader_thread->collector_sockfd);
}
- reader_thread->json_sockfd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
- if (reader_thread->json_sockfd < 0)
+ reader_thread->collector_sockfd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
+ if (reader_thread->collector_sockfd < 0)
{
- reader_thread->json_sock_reconnect = 1;
+ reader_thread->collector_sock_reconnect = 1;
return 1;
}
int opt = NETWORK_BUFFER_MAX_SIZE * 16;
- if (setsockopt(reader_thread->json_sockfd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)) < 0)
+ if (setsockopt(reader_thread->collector_sockfd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)) < 0)
{
return 1;
}
saddr.sun_family = AF_UNIX;
- int written = snprintf(saddr.sun_path, sizeof(saddr.sun_path), "%s", nDPId_options.json_sockpath);
+ int written = snprintf(saddr.sun_path, sizeof(saddr.sun_path), "%s", nDPId_options.collector_sockpath);
if (written < 0)
{
return 1;
}
- if (connect(reader_thread->json_sockfd, (struct sockaddr *)&saddr, sizeof(saddr)) < 0)
+ if (connect(reader_thread->collector_sockfd, (struct sockaddr *)&saddr, sizeof(saddr)) < 0)
{
- reader_thread->json_sock_reconnect = 1;
+ reader_thread->collector_sock_reconnect = 1;
return 1;
}
- if (shutdown(reader_thread->json_sockfd, SHUT_RD) != 0)
+ if (shutdown(reader_thread->collector_sockfd, SHUT_RD) != 0)
{
return 1;
}
- if (fcntl(reader_thread->json_sockfd, F_SETFL, fcntl(reader_thread->json_sockfd, F_GETFL, 0) | O_NONBLOCK) == -1)
+ if (fcntl(reader_thread->collector_sockfd,
+ F_SETFL,
+ fcntl(reader_thread->collector_sockfd, F_GETFL, 0) | O_NONBLOCK) == -1)
{
- reader_thread->json_sock_reconnect = 1;
+ reader_thread->collector_sock_reconnect = 1;
return 1;
}
- reader_thread->json_sock_reconnect = 0;
+ reader_thread->collector_sock_reconnect = 0;
return 0;
}
-static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
+static void send_to_collector(struct nDPId_reader_thread * const reader_thread,
char const * const json_str,
size_t json_str_len)
{
@@ -2110,7 +2120,7 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
if (s_ret < 0 || s_ret == (int)sizeof(newline_json_str))
{
logger(1,
- "[%8llu, %d] JSON buffer prepare failed: snprintf returned %d, buffer size %zu",
+ "[%8llu, %zu] JSON buffer prepare failed: snprintf returned %d, buffer size %zu",
workflow->packets_captured,
reader_thread->array_index,
s_ret,
@@ -2118,12 +2128,12 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
return;
}
- if (reader_thread->json_sock_reconnect != 0)
+ if (reader_thread->collector_sock_reconnect != 0)
{
- if (connect_to_json_socket(reader_thread) == 0)
+ if (connect_to_collector(reader_thread) == 0)
{
logger(1,
- "[%8llu, %d] Reconnected to nDPIsrvd Collector",
+ "[%8llu, %zu] Reconnected to nDPIsrvd Collector",
workflow->packets_captured,
reader_thread->array_index);
jsonize_daemon(reader_thread, DAEMON_EVENT_RECONNECT);
@@ -2132,35 +2142,38 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
errno = 0;
ssize_t written;
- if (reader_thread->json_sock_reconnect == 0 &&
- (written = write(reader_thread->json_sockfd, newline_json_str, s_ret)) != s_ret)
+ if (reader_thread->collector_sock_reconnect == 0 &&
+ (written = write(reader_thread->collector_sockfd, newline_json_str, s_ret)) != s_ret)
{
saved_errno = errno;
if (saved_errno == EPIPE)
{
logger(1,
- "[%8llu, %d] Lost connection to nDPIsrvd Collector",
+ "[%8llu, %zu] Lost connection to nDPIsrvd Collector",
workflow->packets_captured,
reader_thread->array_index);
}
if (saved_errno != EAGAIN)
{
- reader_thread->json_sock_reconnect = 1;
+ reader_thread->collector_sock_reconnect = 1;
}
else
{
- fcntl(reader_thread->json_sockfd, F_SETFL, fcntl(reader_thread->json_sockfd, F_GETFL, 0) & ~O_NONBLOCK);
+ fcntl(reader_thread->collector_sockfd,
+ F_SETFL,
+ fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK);
off_t pos = (written < 0 ? 0 : written);
- while ((written = write(reader_thread->json_sockfd, newline_json_str + pos, s_ret - pos)) != s_ret - pos)
+ while ((written = write(reader_thread->collector_sockfd, newline_json_str + pos, s_ret - pos)) !=
+ s_ret - pos)
{
if (written < 0)
{
logger(1,
- "[%8llu, %d] Send data (blocking I/O) to nDPIsrvd Collector failed: %s",
+ "[%8llu, %zu] Send data (blocking I/O) to nDPIsrvd Collector failed: %s",
workflow->packets_captured,
reader_thread->array_index,
strerror(saved_errno));
- reader_thread->json_sock_reconnect = 1;
+ reader_thread->collector_sock_reconnect = 1;
break;
}
else
@@ -2168,7 +2181,9 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
pos += written;
}
}
- fcntl(reader_thread->json_sockfd, F_SETFL, fcntl(reader_thread->json_sockfd, F_GETFL, 0) & O_NONBLOCK);
+ fcntl(reader_thread->collector_sockfd,
+ F_SETFL,
+ fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & O_NONBLOCK);
}
}
}
@@ -2182,7 +2197,7 @@ static void serialize_and_send(struct nDPId_reader_thread * const reader_thread)
if (json_str == NULL || json_str_len == 0)
{
logger(1,
- "[%8llu, %d] jsonize failed, buffer length: %u",
+ "[%8llu, %zu] jsonize failed, buffer length: %u",
reader_thread->workflow->packets_captured,
reader_thread->array_index,
json_str_len);
@@ -2190,7 +2205,7 @@ static void serialize_and_send(struct nDPId_reader_thread * const reader_thread)
else
{
reader_thread->workflow->total_events_serialized++;
- send_to_json_sink(reader_thread, json_str, json_str_len);
+ send_to_collector(reader_thread, json_str, json_str_len);
}
ndpi_reset_serializer(&reader_thread->workflow->ndpi_serializer);
}
@@ -2319,7 +2334,7 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa
if (flow_ext == NULL)
{
logger(1,
- "[%8llu, %d] BUG: got a PACKET_EVENT_PAYLOAD_FLOW with a flow pointer equals NULL",
+ "[%8llu, %zu] BUG: got a PACKET_EVENT_PAYLOAD_FLOW with a flow pointer equals NULL",
reader_thread->workflow->packets_captured,
reader_thread->array_index);
return;
@@ -2340,7 +2355,7 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa
ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, packet_event_name_table[PACKET_EVENT_INVALID]);
}
- jsonize_basic(reader_thread);
+ jsonize_basic(reader_thread, (event == PACKET_EVENT_PAYLOAD_FLOW ? 1 : 0));
if (event == PACKET_EVENT_PAYLOAD_FLOW)
{
@@ -2370,7 +2385,7 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa
if (ndpi_serialize_string_binary(&workflow->ndpi_serializer, "pkt", base64_data, base64_data_len) != 0)
{
logger(1,
- "[%8llu, %d] JSON serializing base64 packet buffer failed",
+ "[%8llu, %zu] JSON serializing base64 packet buffer failed",
reader_thread->workflow->packets_captured,
reader_thread->array_index);
}
@@ -2378,7 +2393,7 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa
else
{
logger(1,
- "[%8llu, %d] Base64 encoding failed with: %s.",
+ "[%8llu, %zu] Base64 encoding failed with: %s.",
reader_thread->workflow->packets_captured,
reader_thread->array_index,
base64_ret_strings[base64_retval]);
@@ -2403,7 +2418,7 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
{
ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, flow_event_name_table[FLOW_EVENT_INVALID]);
}
- jsonize_basic(reader_thread);
+ jsonize_basic(reader_thread, 1);
jsonize_flow(workflow, flow_ext);
jsonize_l3_l4(workflow, &flow_ext->flow_basic);
@@ -2466,7 +2481,7 @@ static void jsonize_flow_detection_event(struct nDPId_reader_thread * const read
{
ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, flow_event_name_table[FLOW_EVENT_INVALID]);
}
- jsonize_basic(reader_thread);
+ jsonize_basic(reader_thread, 1);
jsonize_flow(workflow, &flow_info->flow_extended);
jsonize_l3_l4(workflow, &flow_info->flow_extended.flow_basic);
@@ -2686,7 +2701,16 @@ __attribute__((format(printf, 3, 4))) static void jsonize_basic_eventf(struct nD
"datalink",
pcap_datalink(reader_thread->workflow->pcap_handle));
- jsonize_basic(reader_thread);
+ switch (event)
+ {
+ case MAX_FLOW_TO_TRACK:
+ case FLOW_MEMORY_ALLOCATION_FAILED:
+ jsonize_basic(reader_thread, 1);
+ break;
+ default:
+ jsonize_basic(reader_thread, 0);
+ break;
+ }
if (format != NULL)
{
@@ -3143,10 +3167,12 @@ static void do_periodically_work(struct nDPId_reader_thread * const reader_threa
reader_thread->workflow->last_scan_time = reader_thread->workflow->last_global_time;
}
if (reader_thread->workflow->last_status_time + nDPId_options.daemon_status_interval +
- reader_thread->array_index * 1000 <= reader_thread->workflow->last_global_time)
+ reader_thread->array_index * 1000 <=
+ reader_thread->workflow->last_global_time)
{
jsonize_daemon(reader_thread, DAEMON_EVENT_STATUS);
- reader_thread->workflow->last_status_time = reader_thread->workflow->last_global_time + reader_thread->array_index * 1000;
+ reader_thread->workflow->last_status_time =
+ reader_thread->workflow->last_global_time + reader_thread->array_index * 1000;
}
#ifdef ENABLE_MEMORY_PROFILING
if (reader_thread->workflow->last_memory_usage_log_time + nDPId_options.memory_profiling_log_interval <=
@@ -3159,6 +3185,12 @@ static void do_periodically_work(struct nDPId_reader_thread * const reader_threa
#endif
}
+static int distribute_single_packet(struct nDPId_reader_thread * const reader_thread)
+{
+ return (reader_thread->workflow->packets_captured % nDPId_options.reader_thread_count ==
+ reader_thread->array_index);
+}
+
static void ndpi_process_packet(uint8_t * const args,
struct pcap_pkthdr const * const header,
uint8_t const * const packet)
@@ -3185,7 +3217,7 @@ static void ndpi_process_packet(uint8_t * const args,
uint16_t l4_payload_len = 0;
uint16_t type = 0;
- int thread_index = nDPId_THREAD_DISTRIBUTION_SEED; // generated with `dd if=/dev/random bs=1024 count=1 |& hd'
+ size_t thread_index = nDPId_THREAD_DISTRIBUTION_SEED; // generated with `dd if=/dev/random bs=1024 count=1 |& hd'
if (reader_thread == NULL)
{
@@ -3225,8 +3257,11 @@ static void ndpi_process_packet(uint8_t * const args,
}
else
{
- jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD);
- jsonize_basic_eventf(reader_thread, UNKNOWN_L3_PROTOCOL, "%s%u", "protocol", type);
+ if (distribute_single_packet(reader_thread) != 0)
+ {
+ jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD);
+ jsonize_basic_eventf(reader_thread, UNKNOWN_L3_PROTOCOL, "%s%u", "protocol", type);
+ }
return;
}
ip_size = header->caplen - ip_offset;
@@ -3235,14 +3270,17 @@ static void ndpi_process_packet(uint8_t * const args,
{
if (header->caplen < header->len)
{
- jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD);
- jsonize_basic_eventf(reader_thread,
- CAPTURE_SIZE_SMALLER_THAN_PACKET_SIZE,
- "%s%u %s%u",
- "size",
- header->caplen,
- "expected",
- header->len);
+ if (distribute_single_packet(reader_thread) != 0)
+ {
+ jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD);
+ jsonize_basic_eventf(reader_thread,
+ CAPTURE_SIZE_SMALLER_THAN_PACKET_SIZE,
+ "%s%u %s%u",
+ "size",
+ header->caplen,
+ "expected",
+ header->len);
+ }
}
}
@@ -3251,9 +3289,17 @@ static void ndpi_process_packet(uint8_t * const args,
{
if (ip_size < sizeof(*ip))
{
- jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD);
- jsonize_basic_eventf(
- reader_thread, IP4_SIZE_SMALLER_THAN_HEADER, "%s%u %s%zu", "size", ip_size, "expected", sizeof(*ip));
+ if (distribute_single_packet(reader_thread) != 0)
+ {
+ jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD);
+ jsonize_basic_eventf(reader_thread,
+ IP4_SIZE_SMALLER_THAN_HEADER,
+ "%s%u %s%zu",
+ "size",
+ ip_size,
+ "expected",
+ sizeof(*ip));
+ }
return;
}
@@ -3262,9 +3308,12 @@ static void ndpi_process_packet(uint8_t * const args,
if (ndpi_detection_get_l4(
(uint8_t *)ip, ip_size, &l4_ptr, &l4_len, &flow_basic.l4_protocol, NDPI_DETECTION_ONLY_IPV4) != 0)
{
- jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD);
- jsonize_basic_eventf(
- reader_thread, IP4_L4_PAYLOAD_DETECTION_FAILED, "%s%zu", "l4_data_len", ip_size - sizeof(*ip));
+ if (distribute_single_packet(reader_thread) != 0)
+ {
+ jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD);
+ jsonize_basic_eventf(
+ reader_thread, IP4_L4_PAYLOAD_DETECTION_FAILED, "%s%zu", "l4_data_len", ip_size - sizeof(*ip));
+ }
return;
}
@@ -3277,14 +3326,17 @@ static void ndpi_process_packet(uint8_t * const args,
{
if (ip_size < sizeof(ip6->ip6_hdr))
{
- jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD);
- jsonize_basic_eventf(reader_thread,
- IP6_SIZE_SMALLER_THAN_HEADER,
- "%s%u %s%zu",
- "size",
- ip_size,
- "expected",
- sizeof(ip6->ip6_hdr));
+ if (distribute_single_packet(reader_thread) != 0)
+ {
+ jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD);
+ jsonize_basic_eventf(reader_thread,
+ IP6_SIZE_SMALLER_THAN_HEADER,
+ "%s%u %s%zu",
+ "size",
+ ip_size,
+ "expected",
+ sizeof(ip6->ip6_hdr));
+ }
return;
}
@@ -3292,9 +3344,12 @@ static void ndpi_process_packet(uint8_t * const args,
if (ndpi_detection_get_l4(
(uint8_t *)ip6, ip_size, &l4_ptr, &l4_len, &flow_basic.l4_protocol, NDPI_DETECTION_ONLY_IPV6) != 0)
{
- jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD);
- jsonize_basic_eventf(
- reader_thread, IP6_L4_PAYLOAD_DETECTION_FAILED, "%s%zu", "l4_data_len", ip_size - sizeof(*ip));
+ if (distribute_single_packet(reader_thread) != 0)
+ {
+ jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD);
+ jsonize_basic_eventf(
+ reader_thread, IP6_L4_PAYLOAD_DETECTION_FAILED, "%s%zu", "l4_data_len", ip_size - sizeof(*ip));
+ }
return;
}
@@ -3319,8 +3374,11 @@ static void ndpi_process_packet(uint8_t * const args,
}
else
{
- jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD);
- jsonize_basic_eventf(reader_thread, UNKNOWN_L3_PROTOCOL, "%s%u", "protocol", type);
+ if (distribute_single_packet(reader_thread) != 0)
+ {
+ jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD);
+ jsonize_basic_eventf(reader_thread, UNKNOWN_L3_PROTOCOL, "%s%u", "protocol", type);
+ }
return;
}
@@ -3331,15 +3389,25 @@ static void ndpi_process_packet(uint8_t * const args,
if (header->caplen < (l4_ptr - packet) + sizeof(struct ndpi_tcphdr))
{
- jsonize_packet_event(
- reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD);
- jsonize_basic_eventf(reader_thread,
- TCP_PACKET_TOO_SHORT,
- "%s%u %s%zu",
- "size",
- header->caplen,
- "expected",
- (l4_ptr - packet) + sizeof(struct ndpi_tcphdr));
+ if (distribute_single_packet(reader_thread) != 0)
+ {
+ jsonize_packet_event(reader_thread,
+ header,
+ packet,
+ type,
+ ip_offset,
+ (l4_ptr - packet),
+ l4_len,
+ NULL,
+ PACKET_EVENT_PAYLOAD);
+ jsonize_basic_eventf(reader_thread,
+ TCP_PACKET_TOO_SHORT,
+ "%s%u %s%zu",
+ "size",
+ header->caplen,
+ "expected",
+ (l4_ptr - packet) + sizeof(struct ndpi_tcphdr));
+ }
return;
}
tcp = (struct ndpi_tcphdr *)l4_ptr;
@@ -3355,15 +3423,25 @@ static void ndpi_process_packet(uint8_t * const args,
if (header->caplen < (l4_ptr - packet) + sizeof(struct ndpi_udphdr))
{
- jsonize_packet_event(
- reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD);
- jsonize_basic_eventf(reader_thread,
- UDP_PACKET_TOO_SHORT,
- "%s%u %s%zu",
- "size",
- header->caplen,
- "expected",
- (l4_ptr - packet) + sizeof(struct ndpi_udphdr));
+ if (distribute_single_packet(reader_thread) != 0)
+ {
+ jsonize_packet_event(reader_thread,
+ header,
+ packet,
+ type,
+ ip_offset,
+ (l4_ptr - packet),
+ l4_len,
+ NULL,
+ PACKET_EVENT_PAYLOAD);
+ jsonize_basic_eventf(reader_thread,
+ UDP_PACKET_TOO_SHORT,
+ "%s%u %s%zu",
+ "size",
+ header->caplen,
+ "expected",
+ (l4_ptr - packet) + sizeof(struct ndpi_udphdr));
+ }
return;
}
udp = (struct ndpi_udphdr *)l4_ptr;
@@ -3520,31 +3598,57 @@ static void ndpi_process_packet(uint8_t * const args,
if (workflow->cur_active_flows == workflow->max_active_flows)
{
- jsonize_packet_event(
- reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD);
- jsonize_basic_eventf(reader_thread,
- MAX_FLOW_TO_TRACK,
- "%s%llu %s%llu %s%llu %s%llu",
- "current_active",
- workflow->cur_active_flows,
- "current_idle",
- workflow->cur_idle_flows,
- "max_active",
- workflow->max_active_flows,
- "max_idle",
- workflow->max_idle_flows);
+ if (workflow->max_flow_to_track_reached == 0)
+ {
+ workflow->max_flow_to_track_reached = 1;
+
+ jsonize_packet_event(reader_thread,
+ header,
+ packet,
+ type,
+ ip_offset,
+ (l4_ptr - packet),
+ l4_len,
+ NULL,
+ PACKET_EVENT_PAYLOAD);
+ jsonize_basic_eventf(reader_thread,
+ MAX_FLOW_TO_TRACK,
+ "%s%llu %s%llu %s%llu %s%llu",
+ "current_active",
+ workflow->cur_active_flows,
+ "current_idle",
+ workflow->cur_idle_flows,
+ "max_active",
+ workflow->max_active_flows,
+ "max_idle",
+ workflow->max_idle_flows);
+ }
return;
}
+ workflow->max_flow_to_track_reached = 0;
flow_to_process = (struct nDPId_flow_info *)add_new_flow(workflow, &flow_basic, FS_INFO, hashed_index);
if (flow_to_process == NULL)
{
- jsonize_packet_event(
- reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD);
- jsonize_basic_eventf(
- reader_thread, FLOW_MEMORY_ALLOCATION_FAILED, "%s%zu", "size", sizeof(*flow_to_process));
+ if (workflow->flow_allocation_already_failed == 0)
+ {
+ workflow->flow_allocation_already_failed = 1;
+
+ jsonize_packet_event(reader_thread,
+ header,
+ packet,
+ type,
+ ip_offset,
+ (l4_ptr - packet),
+ l4_len,
+ NULL,
+ PACKET_EVENT_PAYLOAD);
+ jsonize_basic_eventf(
+ reader_thread, FLOW_MEMORY_ALLOCATION_FAILED, "%s%zu", "size", sizeof(*flow_to_process));
+ }
return;
}
+ workflow->flow_allocation_already_failed = 0;
workflow->total_active_flows++;
flow_to_process->flow_extended.flow_id = __sync_fetch_and_add(&global_flow_id, 1);
@@ -3788,6 +3892,10 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
nready = epoll_wait(epoll_fd, events, events_size, timeout_ms);
if (errno != 0)
{
+ if (errno == EINTR)
+ {
+ continue;
+ }
logger(1, "Epoll returned error: %s", strerror(errno));
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
break;
@@ -3841,16 +3949,16 @@ static void * processing_thread(void * const ndpi_thread_arg)
{
struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)ndpi_thread_arg;
- reader_thread->json_sockfd = -1;
- reader_thread->json_sock_reconnect = 1;
+ reader_thread->collector_sockfd = -1;
+ reader_thread->collector_sock_reconnect = 1;
errno = 0;
- if (connect_to_json_socket(reader_thread) != 0)
+ if (connect_to_collector(reader_thread) != 0)
{
logger(1,
- "Thread %u: Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s",
+ "Thread %zu: Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s",
reader_thread->array_index,
- nDPId_options.json_sockpath,
+ nDPId_options.collector_sockpath,
(errno != 0 ? strerror(errno) : "Internal Error."));
}
else
@@ -3859,7 +3967,7 @@ static void * processing_thread(void * const ndpi_thread_arg)
}
run_pcap_loop(reader_thread);
- fcntl(reader_thread->json_sockfd, F_SETFL, fcntl(reader_thread->json_sockfd, F_GETFL, 0) & ~O_NONBLOCK);
+ fcntl(reader_thread->collector_sockfd, F_SETFL, fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK);
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
return NULL;
}
@@ -3982,13 +4090,13 @@ static void process_remaining_flows(void)
{
for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i)
{
- if (fcntl(reader_threads[i].json_sockfd,
+ if (fcntl(reader_threads[i].collector_sockfd,
F_SETFL,
- fcntl(reader_threads[i].json_sockfd, F_GETFL, 0) & ~O_NONBLOCK) == -1)
+ fcntl(reader_threads[i].collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK) == -1)
{
logger(1,
"Could not set JSON fd %d to blocking mode for shutdown: %s",
- reader_threads[i].json_sockfd,
+ reader_threads[i].collector_sockfd,
strerror(errno));
}
@@ -4060,7 +4168,7 @@ static int stop_reader_threads(void)
total_flow_updates += reader_threads[i].workflow->total_flow_updates;
printf(
- "Stopping Thread %2d, processed %llu packets, %llu bytes\n"
+ "Stopping Thread %2zu, processed %llu packets, %llu bytes\n"
"\tskipped flows.....: %8llu, processed flows: %8llu, idle flows....: %8llu\n"
"\tnot detected flows: %8llu, guessed flows..: %8llu, detected flows: %8llu\n"
"\tdetection updates.: %8llu, updated flows..: %8llu\n",
@@ -4265,8 +4373,8 @@ static int nDPId_parse_options(int argc, char ** argv)
}
break;
case 'c':
- strncpy(nDPId_options.json_sockpath, optarg, sizeof(nDPId_options.json_sockpath) - 1);
- nDPId_options.json_sockpath[sizeof(nDPId_options.json_sockpath) - 1] = '\0';
+ strncpy(nDPId_options.collector_sockpath, optarg, sizeof(nDPId_options.collector_sockpath) - 1);
+ nDPId_options.collector_sockpath[sizeof(nDPId_options.collector_sockpath) - 1] = '\0';
break;
case 'd':
daemonize_enable();
@@ -4441,17 +4549,17 @@ static int validate_options(void)
}
}
#endif
- if (is_path_absolute("JSON socket", nDPId_options.json_sockpath) != 0)
+ if (is_path_absolute("Collector socket", nDPId_options.collector_sockpath) != 0)
{
retval = 1;
}
{
struct sockaddr_un saddr;
- if (strlen(nDPId_options.json_sockpath) >= sizeof(saddr.sun_path))
+ if (strlen(nDPId_options.collector_sockpath) >= sizeof(saddr.sun_path))
{
logger_early(1,
- "JSON socket path too long, current/max: %zu/%zu",
- strlen(nDPId_options.json_sockpath),
+ "Collector socket path too long, current/max: %zu/%zu",
+ strlen(nDPId_options.collector_sockpath),
sizeof(saddr.sun_path) - 1);
retval = 1;
}