summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2023-12-14 15:38:38 +0100
committerToni Uhlig <matzeton@googlemail.com>2023-12-14 15:38:38 +0100
commitfbe07fd8821c1ca0f310f83913fb9d4dfac5d01a (patch)
treef3f4ad20205c9dd4c568141af9dd6ad69108e97e /examples
parent5432b06665db6bcc2f8b615dbfd8004d02cbc034 (diff)
Improved InfluxDB push daemon.
* fixed severity parsing and gauge handling * added flow state gauges * flow related gauges are only increased/decreased if a "new" event was seen (except for bytes xfer) Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'examples')
-rw-r--r--examples/c-influxd/c-influxd.c206
1 files changed, 149 insertions, 57 deletions
diff --git a/examples/c-influxd/c-influxd.c b/examples/c-influxd/c-influxd.c
index 469e40d67..caa8be20c 100644
--- a/examples/c-influxd/c-influxd.c
+++ b/examples/c-influxd/c-influxd.c
@@ -12,6 +12,7 @@
#include "utils.h"
#define MAX_RISKS_PER_FLOW 8
+#define MAX_SEVERITIES_PER_FLOW 4
static int main_thread_shutdown = 0;
static int influxd_timerfd = -1;
@@ -30,21 +31,23 @@ struct flow_user_data
nDPIsrvd_ull last_flow_src_l4_payload_len;
nDPIsrvd_ull last_flow_dst_l4_payload_len;
uint8_t risks[MAX_RISKS_PER_FLOW];
+ uint8_t severities[MAX_SEVERITIES_PER_FLOW];
uint8_t category;
uint8_t breed;
uint8_t confidence;
- uint8_t severity;
// "fallthroughs" if we are not in sync with nDPI
uint8_t risk_ndpid_invalid : 1;
uint8_t category_ndpid_invalid : 1;
uint8_t breed_ndpid_invalid : 1;
uint8_t confidence_ndpid_invalid : 1;
- uint8_t severity_ndpid_invalid : 1;
// detection status
uint8_t new_seen : 1;
uint8_t is_detected : 1;
uint8_t is_guessed : 1;
uint8_t is_not_detected : 1;
+ // flow state
+ uint8_t is_info : 1;
+ uint8_t is_finished : 1;
// Layer3 / Layer4
uint8_t is_ip4 : 1;
uint8_t is_ip6 : 1;
@@ -114,6 +117,9 @@ static struct
struct
{
+ uint64_t flow_state_info;
+ uint64_t flow_state_finished;
+
uint64_t flow_breed_safe_count;
uint64_t flow_breed_acceptable_count;
uint64_t flow_breed_fun_count;
@@ -398,6 +404,14 @@ static int serialize_influx_line(char * buf, size_t siz)
bytes = snprintf(buf,
siz,
+ "%s " INFLUXDB_FORMAT() INFLUXDB_FORMAT_END(),
+ "state",
+ INFLUXDB_VALUE_GAUGE(flow_state_info),
+ INFLUXDB_VALUE_GAUGE(flow_state_finished));
+ CHECK_SNPRINTF_RET(bytes);
+
+ bytes = snprintf(buf,
+ siz,
"%s " INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT()
INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT_END(),
"breed",
@@ -754,6 +768,16 @@ static void influxd_unmap_flow_from_stat(struct flow_user_data * const flow_user
influxd_statistics.gauges.flow_not_detected_count--;
}
+ if (flow_user_data->is_info != 0)
+ {
+ influxd_statistics.gauges.flow_state_info--;
+ }
+
+ if (flow_user_data->is_finished != 0)
+ {
+ influxd_statistics.gauges.flow_state_finished--;
+ }
+
if (flow_user_data->breed > 0 && flow_user_data->breed_ndpid_invalid == 0 &&
breeds_map[flow_user_data->breed - 1].global_stat != NULL)
{
@@ -772,10 +796,12 @@ static void influxd_unmap_flow_from_stat(struct flow_user_data * const flow_user
(*confidence_map[flow_user_data->confidence - 1].global_stat)--;
}
- if (flow_user_data->severity > 0 && flow_user_data->severity_ndpid_invalid == 0 &&
- severity_map[flow_user_data->severity - 1].global_stat != NULL)
+ for (uint8_t i = 0; i < MAX_SEVERITIES_PER_FLOW; ++i)
{
- (*severity_map[flow_user_data->severity - 1].global_stat)--;
+ if (flow_user_data->severities[i] > 0)
+ {
+ (*severity_map[flow_user_data->severities[i] - 1].global_stat)--;
+ }
}
for (uint8_t i = 0; i < MAX_RISKS_PER_FLOW; ++i)
@@ -850,6 +876,7 @@ static void process_flow_stats(struct nDPIsrvd_socket * const sock, struct nDPIs
{
struct flow_user_data * flow_user_data;
struct nDPIsrvd_json_token const * const flow_event_name = TOKEN_GET_SZ(sock, "flow_event_name");
+ struct nDPIsrvd_json_token const * const flow_state = TOKEN_GET_SZ(sock, "flow_state");
nDPIsrvd_ull total_bytes_ull[2];
if (flow == NULL)
@@ -862,20 +889,6 @@ static void process_flow_stats(struct nDPIsrvd_socket * const sock, struct nDPIs
return;
}
- if (TOKEN_VALUE_TO_ULL(sock, TOKEN_GET_SZ(sock, "flow_src_tot_l4_payload_len"), &total_bytes_ull[0]) ==
- CONVERSION_OK &&
- TOKEN_VALUE_TO_ULL(sock, TOKEN_GET_SZ(sock, "flow_dst_tot_l4_payload_len"), &total_bytes_ull[1]) ==
- CONVERSION_OK)
- {
- influxd_statistics.counters.flow_src_total_bytes +=
- total_bytes_ull[0] - flow_user_data->last_flow_src_l4_payload_len;
- influxd_statistics.counters.flow_dst_total_bytes +=
- total_bytes_ull[1] - flow_user_data->last_flow_dst_l4_payload_len;
-
- flow_user_data->last_flow_src_l4_payload_len = total_bytes_ull[0];
- flow_user_data->last_flow_dst_l4_payload_len = total_bytes_ull[1];
- }
-
if (TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "new") != 0)
{
flow_user_data->new_seen = 1;
@@ -920,7 +933,12 @@ static void process_flow_stats(struct nDPIsrvd_socket * const sock, struct nDPIs
influxd_statistics.gauges.flow_l4_other_count++;
}
}
- else if (TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "not-detected") != 0)
+ else if (flow_user_data->new_seen == 0)
+ {
+ return;
+ }
+
+ if (TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "not-detected") != 0)
{
flow_user_data->is_not_detected = 1;
influxd_statistics.gauges.flow_not_detected_count++;
@@ -962,30 +980,50 @@ static void process_flow_stats(struct nDPIsrvd_socket * const sock, struct nDPIs
strncpy(numeric_risk_buf, numeric_risk_str, numeric_risk_len);
numeric_risk_buf[numeric_risk_len] = '\0';
- if (flow_user_data->severity == 0 && flow_user_data->severity_ndpid_invalid == 0)
+ struct nDPIsrvd_json_token const * const severity =
+ TOKEN_GET_SZ(sock, "ndpi", "flow_risk", numeric_risk_buf, "severity");
+ uint8_t severity_index;
+
+ if (influxd_map_flow_u8(
+ sock, severity, severity_map, nDPIsrvd_ARRAY_LENGTH(severity_map), &severity_index) != 0)
{
- struct nDPIsrvd_json_token const * const severity =
- TOKEN_GET_SZ(sock, "ndpi", "flow_risk", numeric_risk_buf, "severity");
- if (influxd_map_flow_u8(sock,
- severity,
- severity_map,
- nDPIsrvd_ARRAY_LENGTH(severity_map),
- &flow_user_data->severity) != 0 ||
- influxd_map_value_to_stat(
- sock, severity, severity_map, nDPIsrvd_ARRAY_LENGTH(severity_map)) != 0)
+ severity_index = 0;
+ }
+
+ if (severity_index != 0)
+ {
+ for (uint8_t i = 0; i < MAX_SEVERITIES_PER_FLOW; ++i)
{
- size_t value_len = 0;
- char const * const value_str = TOKEN_GET_VALUE(sock, severity, &value_len);
+ if (flow_user_data->severities[i] != 0)
+ {
+ continue;
+ }
+ if (flow_user_data->severities[i] == severity_index)
+ {
+ break;
+ }
- flow_user_data->severity = 0;
- flow_user_data->severity_ndpid_invalid = 1;
- if (value_len > 0 && value_str != NULL)
+ if (influxd_map_value_to_stat(
+ sock, severity, severity_map, nDPIsrvd_ARRAY_LENGTH(severity_map)) != 0)
{
- logger(1,
- "Unknown/Invalid JSON value for key 'ndpi','breed': %.*s",
- (int)value_len,
- value_str);
+ severity_index = 0;
+ break;
}
+ flow_user_data->severities[i] = severity_index;
+ break;
+ }
+ }
+ if (severity_index == 0)
+ {
+ size_t value_len = 0;
+ char const * const value_str = TOKEN_GET_VALUE(sock, severity, &value_len);
+
+ if (value_len > 0 && value_str != NULL)
+ {
+ logger(1,
+ "Unknown/Invalid JSON value for key 'ndpi','breed': %.*s",
+ (int)value_len,
+ value_str);
}
}
@@ -999,9 +1037,14 @@ static void process_flow_stats(struct nDPIsrvd_socket * const sock, struct nDPIs
{
continue;
}
+ if (flow_user_data->risks[i] == numeric_risk_value)
+ {
+ break;
+ }
influxd_statistics.gauges.flow_risk_count[numeric_risk_value]++;
flow_user_data->risks[i] = numeric_risk_value;
+ break;
}
}
else if (flow_user_data->risk_ndpid_invalid == 0)
@@ -1099,8 +1142,45 @@ static void process_flow_stats(struct nDPIsrvd_socket * const sock, struct nDPIs
}
}
}
- else if (TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "end") != 0 ||
- TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "idle") != 0)
+
+ if (TOKEN_VALUE_EQUALS_SZ(sock, flow_state, "info") != 0)
+ {
+ if (flow_user_data->is_info == 0)
+ {
+ flow_user_data->is_info = 1;
+ influxd_statistics.gauges.flow_state_info++;
+ }
+ }
+ else if (TOKEN_VALUE_EQUALS_SZ(sock, flow_state, "finished") != 0)
+ {
+ if (flow_user_data->is_finished == 0)
+ {
+ if (flow_user_data->is_info != 0)
+ {
+ flow_user_data->is_info = 0;
+ influxd_statistics.gauges.flow_state_info--;
+ }
+ flow_user_data->is_finished = 1;
+ influxd_statistics.gauges.flow_state_finished++;
+ }
+ }
+
+ if (TOKEN_VALUE_TO_ULL(sock, TOKEN_GET_SZ(sock, "flow_src_tot_l4_payload_len"), &total_bytes_ull[0]) ==
+ CONVERSION_OK &&
+ TOKEN_VALUE_TO_ULL(sock, TOKEN_GET_SZ(sock, "flow_dst_tot_l4_payload_len"), &total_bytes_ull[1]) ==
+ CONVERSION_OK)
+ {
+ influxd_statistics.counters.flow_src_total_bytes +=
+ total_bytes_ull[0] - flow_user_data->last_flow_src_l4_payload_len;
+ influxd_statistics.counters.flow_dst_total_bytes +=
+ total_bytes_ull[1] - flow_user_data->last_flow_dst_l4_payload_len;
+
+ flow_user_data->last_flow_src_l4_payload_len = total_bytes_ull[0];
+ flow_user_data->last_flow_dst_l4_payload_len = total_bytes_ull[1];
+ }
+
+ if (TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "end") != 0 ||
+ TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "idle") != 0)
{
influxd_unmap_flow_from_stat(flow_user_data);
}
@@ -1114,6 +1194,11 @@ static enum nDPIsrvd_callback_return influxd_json_callback(struct nDPIsrvd_socke
(void)instance;
(void)thread_data;
+ struct nDPIsrvd_json_token const * const flow_event = TOKEN_GET_SZ(sock, "flow_event_name");
+ struct nDPIsrvd_json_token const * const packet_event = TOKEN_GET_SZ(sock, "packet_event_name");
+ struct nDPIsrvd_json_token const * const daemon_event = TOKEN_GET_SZ(sock, "daemon_event_name");
+ struct nDPIsrvd_json_token const * const error_event = TOKEN_GET_SZ(sock, "error_event_name");
+
pthread_mutex_lock(&influxd_statistics.rw_lock);
influxd_statistics.counters.json_lines++;
@@ -1121,22 +1206,29 @@ static enum nDPIsrvd_callback_return influxd_json_callback(struct nDPIsrvd_socke
process_flow_stats(sock, flow);
- influxd_map_value_to_stat(sock,
- TOKEN_GET_SZ(sock, "flow_event_name"),
- flow_event_map,
- nDPIsrvd_ARRAY_LENGTH(flow_event_map));
- influxd_map_value_to_stat(sock,
- TOKEN_GET_SZ(sock, "packet_event_name"),
- packet_event_map,
- nDPIsrvd_ARRAY_LENGTH(packet_event_map));
- influxd_map_value_to_stat(sock,
- TOKEN_GET_SZ(sock, "daemon_event_name"),
- daemon_event_map,
- nDPIsrvd_ARRAY_LENGTH(daemon_event_map));
- influxd_map_value_to_stat(sock,
- TOKEN_GET_SZ(sock, "error_event_name"),
- error_event_map,
- nDPIsrvd_ARRAY_LENGTH(error_event_map));
+ if (flow_event != NULL &&
+ influxd_map_value_to_stat(sock, flow_event, flow_event_map, nDPIsrvd_ARRAY_LENGTH(flow_event_map)) != 0)
+ {
+ logger(1, "%s", "Unknown flow_event_name");
+ }
+
+ if (packet_event != NULL &&
+ influxd_map_value_to_stat(sock, packet_event, packet_event_map, nDPIsrvd_ARRAY_LENGTH(packet_event_map)) != 0)
+ {
+ logger(1, "%s", "Unknown packet_event_name");
+ }
+
+ if (daemon_event != NULL &&
+ influxd_map_value_to_stat(sock, daemon_event, daemon_event_map, nDPIsrvd_ARRAY_LENGTH(daemon_event_map)) != 0)
+ {
+ logger(1, "%s", "Unknown daemon_event_name");
+ }
+
+ if (error_event != NULL &&
+ influxd_map_value_to_stat(sock, error_event, error_event_map, nDPIsrvd_ARRAY_LENGTH(error_event_map)) != 0)
+ {
+ logger(1, "%s", "Unknown error_event_name");
+ }
pthread_mutex_unlock(&influxd_statistics.rw_lock);
return CALLBACK_OK;