diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2023-12-14 15:38:38 +0100 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2023-12-14 15:38:38 +0100 |
commit | fbe07fd8821c1ca0f310f83913fb9d4dfac5d01a (patch) | |
tree | f3f4ad20205c9dd4c568141af9dd6ad69108e97e /examples | |
parent | 5432b06665db6bcc2f8b615dbfd8004d02cbc034 (diff) |
Improved InfluxDB push daemon.
* fixed severity parsing and gauge handling
* added flow state gauges
* flow related gauges are only increased/decreased if a "new" event was seen (except for bytes xfer)
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'examples')
-rw-r--r-- | examples/c-influxd/c-influxd.c | 206 |
1 files changed, 149 insertions, 57 deletions
diff --git a/examples/c-influxd/c-influxd.c b/examples/c-influxd/c-influxd.c index 469e40d67..caa8be20c 100644 --- a/examples/c-influxd/c-influxd.c +++ b/examples/c-influxd/c-influxd.c @@ -12,6 +12,7 @@ #include "utils.h" #define MAX_RISKS_PER_FLOW 8 +#define MAX_SEVERITIES_PER_FLOW 4 static int main_thread_shutdown = 0; static int influxd_timerfd = -1; @@ -30,21 +31,23 @@ struct flow_user_data nDPIsrvd_ull last_flow_src_l4_payload_len; nDPIsrvd_ull last_flow_dst_l4_payload_len; uint8_t risks[MAX_RISKS_PER_FLOW]; + uint8_t severities[MAX_SEVERITIES_PER_FLOW]; uint8_t category; uint8_t breed; uint8_t confidence; - uint8_t severity; // "fallthroughs" if we are not in sync with nDPI uint8_t risk_ndpid_invalid : 1; uint8_t category_ndpid_invalid : 1; uint8_t breed_ndpid_invalid : 1; uint8_t confidence_ndpid_invalid : 1; - uint8_t severity_ndpid_invalid : 1; // detection status uint8_t new_seen : 1; uint8_t is_detected : 1; uint8_t is_guessed : 1; uint8_t is_not_detected : 1; + // flow state + uint8_t is_info : 1; + uint8_t is_finished : 1; // Layer3 / Layer4 uint8_t is_ip4 : 1; uint8_t is_ip6 : 1; @@ -114,6 +117,9 @@ static struct struct { + uint64_t flow_state_info; + uint64_t flow_state_finished; + uint64_t flow_breed_safe_count; uint64_t flow_breed_acceptable_count; uint64_t flow_breed_fun_count; @@ -398,6 +404,14 @@ static int serialize_influx_line(char * buf, size_t siz) bytes = snprintf(buf, siz, + "%s " INFLUXDB_FORMAT() INFLUXDB_FORMAT_END(), + "state", + INFLUXDB_VALUE_GAUGE(flow_state_info), + INFLUXDB_VALUE_GAUGE(flow_state_finished)); + CHECK_SNPRINTF_RET(bytes); + + bytes = snprintf(buf, + siz, "%s " INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT_END(), "breed", @@ -754,6 +768,16 @@ static void influxd_unmap_flow_from_stat(struct flow_user_data * const flow_user influxd_statistics.gauges.flow_not_detected_count--; } + if (flow_user_data->is_info != 0) + { + influxd_statistics.gauges.flow_state_info--; + } + + if (flow_user_data->is_finished != 0) + { + influxd_statistics.gauges.flow_state_finished--; + } + if (flow_user_data->breed > 0 && flow_user_data->breed_ndpid_invalid == 0 && breeds_map[flow_user_data->breed - 1].global_stat != NULL) { @@ -772,10 +796,12 @@ static void influxd_unmap_flow_from_stat(struct flow_user_data * const flow_user (*confidence_map[flow_user_data->confidence - 1].global_stat)--; } - if (flow_user_data->severity > 0 && flow_user_data->severity_ndpid_invalid == 0 && - severity_map[flow_user_data->severity - 1].global_stat != NULL) + for (uint8_t i = 0; i < MAX_SEVERITIES_PER_FLOW; ++i) { - (*severity_map[flow_user_data->severity - 1].global_stat)--; + if (flow_user_data->severities[i] > 0) + { + (*severity_map[flow_user_data->severities[i] - 1].global_stat)--; + } } for (uint8_t i = 0; i < MAX_RISKS_PER_FLOW; ++i) @@ -850,6 +876,7 @@ static void process_flow_stats(struct nDPIsrvd_socket * const sock, struct nDPIs { struct flow_user_data * flow_user_data; struct nDPIsrvd_json_token const * const flow_event_name = TOKEN_GET_SZ(sock, "flow_event_name"); + struct nDPIsrvd_json_token const * const flow_state = TOKEN_GET_SZ(sock, "flow_state"); nDPIsrvd_ull total_bytes_ull[2]; if (flow == NULL) @@ -862,20 +889,6 @@ static void process_flow_stats(struct nDPIsrvd_socket * const sock, struct nDPIs return; } - if (TOKEN_VALUE_TO_ULL(sock, TOKEN_GET_SZ(sock, "flow_src_tot_l4_payload_len"), &total_bytes_ull[0]) == - CONVERSION_OK && - TOKEN_VALUE_TO_ULL(sock, TOKEN_GET_SZ(sock, "flow_dst_tot_l4_payload_len"), &total_bytes_ull[1]) == - CONVERSION_OK) - { - influxd_statistics.counters.flow_src_total_bytes += - total_bytes_ull[0] - flow_user_data->last_flow_src_l4_payload_len; - influxd_statistics.counters.flow_dst_total_bytes += - total_bytes_ull[1] - flow_user_data->last_flow_dst_l4_payload_len; - - flow_user_data->last_flow_src_l4_payload_len = total_bytes_ull[0]; - flow_user_data->last_flow_dst_l4_payload_len = total_bytes_ull[1]; - } - if (TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "new") != 0) { flow_user_data->new_seen = 1; @@ -920,7 +933,12 @@ static void process_flow_stats(struct nDPIsrvd_socket * const sock, struct nDPIs influxd_statistics.gauges.flow_l4_other_count++; } } - else if (TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "not-detected") != 0) + else if (flow_user_data->new_seen == 0) + { + return; + } + + if (TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "not-detected") != 0) { flow_user_data->is_not_detected = 1; influxd_statistics.gauges.flow_not_detected_count++; @@ -962,30 +980,50 @@ static void process_flow_stats(struct nDPIsrvd_socket * const sock, struct nDPIs strncpy(numeric_risk_buf, numeric_risk_str, numeric_risk_len); numeric_risk_buf[numeric_risk_len] = '\0'; - if (flow_user_data->severity == 0 && flow_user_data->severity_ndpid_invalid == 0) + struct nDPIsrvd_json_token const * const severity = + TOKEN_GET_SZ(sock, "ndpi", "flow_risk", numeric_risk_buf, "severity"); + uint8_t severity_index; + + if (influxd_map_flow_u8( + sock, severity, severity_map, nDPIsrvd_ARRAY_LENGTH(severity_map), &severity_index) != 0) { - struct nDPIsrvd_json_token const * const severity = - TOKEN_GET_SZ(sock, "ndpi", "flow_risk", numeric_risk_buf, "severity"); - if (influxd_map_flow_u8(sock, - severity, - severity_map, - nDPIsrvd_ARRAY_LENGTH(severity_map), - &flow_user_data->severity) != 0 || - influxd_map_value_to_stat( - sock, severity, severity_map, nDPIsrvd_ARRAY_LENGTH(severity_map)) != 0) + severity_index = 0; + } + + if (severity_index != 0) + { + for (uint8_t i = 0; i < MAX_SEVERITIES_PER_FLOW; ++i) { - size_t value_len = 0; - char const * const value_str = TOKEN_GET_VALUE(sock, severity, &value_len); + if (flow_user_data->severities[i] != 0) + { + continue; + } + if (flow_user_data->severities[i] == severity_index) + { + break; + } - flow_user_data->severity = 0; - flow_user_data->severity_ndpid_invalid = 1; - if (value_len > 0 && value_str != NULL) + if (influxd_map_value_to_stat( + sock, severity, severity_map, nDPIsrvd_ARRAY_LENGTH(severity_map)) != 0) { - logger(1, - "Unknown/Invalid JSON value for key 'ndpi','breed': %.*s", - (int)value_len, - value_str); + severity_index = 0; + break; } + flow_user_data->severities[i] = severity_index; + break; + } + } + if (severity_index == 0) + { + size_t value_len = 0; + char const * const value_str = TOKEN_GET_VALUE(sock, severity, &value_len); + + if (value_len > 0 && value_str != NULL) + { + logger(1, + "Unknown/Invalid JSON value for key 'ndpi','breed': %.*s", + (int)value_len, + value_str); } } @@ -999,9 +1037,14 @@ static void process_flow_stats(struct nDPIsrvd_socket * const sock, struct nDPIs { continue; } + if (flow_user_data->risks[i] == numeric_risk_value) + { + break; + } influxd_statistics.gauges.flow_risk_count[numeric_risk_value]++; flow_user_data->risks[i] = numeric_risk_value; + break; } } else if (flow_user_data->risk_ndpid_invalid == 0) @@ -1099,8 +1142,45 @@ static void process_flow_stats(struct nDPIsrvd_socket * const sock, struct nDPIs } } } - else if (TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "end") != 0 || - TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "idle") != 0) + + if (TOKEN_VALUE_EQUALS_SZ(sock, flow_state, "info") != 0) + { + if (flow_user_data->is_info == 0) + { + flow_user_data->is_info = 1; + influxd_statistics.gauges.flow_state_info++; + } + } + else if (TOKEN_VALUE_EQUALS_SZ(sock, flow_state, "finished") != 0) + { + if (flow_user_data->is_finished == 0) + { + if (flow_user_data->is_info != 0) + { + flow_user_data->is_info = 0; + influxd_statistics.gauges.flow_state_info--; + } + flow_user_data->is_finished = 1; + influxd_statistics.gauges.flow_state_finished++; + } + } + + if (TOKEN_VALUE_TO_ULL(sock, TOKEN_GET_SZ(sock, "flow_src_tot_l4_payload_len"), &total_bytes_ull[0]) == + CONVERSION_OK && + TOKEN_VALUE_TO_ULL(sock, TOKEN_GET_SZ(sock, "flow_dst_tot_l4_payload_len"), &total_bytes_ull[1]) == + CONVERSION_OK) + { + influxd_statistics.counters.flow_src_total_bytes += + total_bytes_ull[0] - flow_user_data->last_flow_src_l4_payload_len; + influxd_statistics.counters.flow_dst_total_bytes += + total_bytes_ull[1] - flow_user_data->last_flow_dst_l4_payload_len; + + flow_user_data->last_flow_src_l4_payload_len = total_bytes_ull[0]; + flow_user_data->last_flow_dst_l4_payload_len = total_bytes_ull[1]; + } + + if (TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "end") != 0 || + TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "idle") != 0) { influxd_unmap_flow_from_stat(flow_user_data); } @@ -1114,6 +1194,11 @@ static enum nDPIsrvd_callback_return influxd_json_callback(struct nDPIsrvd_socke (void)instance; (void)thread_data; + struct nDPIsrvd_json_token const * const flow_event = TOKEN_GET_SZ(sock, "flow_event_name"); + struct nDPIsrvd_json_token const * const packet_event = TOKEN_GET_SZ(sock, "packet_event_name"); + struct nDPIsrvd_json_token const * const daemon_event = TOKEN_GET_SZ(sock, "daemon_event_name"); + struct nDPIsrvd_json_token const * const error_event = TOKEN_GET_SZ(sock, "error_event_name"); + pthread_mutex_lock(&influxd_statistics.rw_lock); influxd_statistics.counters.json_lines++; @@ -1121,22 +1206,29 @@ static enum nDPIsrvd_callback_return influxd_json_callback(struct nDPIsrvd_socke process_flow_stats(sock, flow); - influxd_map_value_to_stat(sock, - TOKEN_GET_SZ(sock, "flow_event_name"), - flow_event_map, - nDPIsrvd_ARRAY_LENGTH(flow_event_map)); - influxd_map_value_to_stat(sock, - TOKEN_GET_SZ(sock, "packet_event_name"), - packet_event_map, - nDPIsrvd_ARRAY_LENGTH(packet_event_map)); - influxd_map_value_to_stat(sock, - TOKEN_GET_SZ(sock, "daemon_event_name"), - daemon_event_map, - nDPIsrvd_ARRAY_LENGTH(daemon_event_map)); - influxd_map_value_to_stat(sock, - TOKEN_GET_SZ(sock, "error_event_name"), - error_event_map, - nDPIsrvd_ARRAY_LENGTH(error_event_map)); + if (flow_event != NULL && + influxd_map_value_to_stat(sock, flow_event, flow_event_map, nDPIsrvd_ARRAY_LENGTH(flow_event_map)) != 0) + { + logger(1, "%s", "Unknown flow_event_name"); + } + + if (packet_event != NULL && + influxd_map_value_to_stat(sock, packet_event, packet_event_map, nDPIsrvd_ARRAY_LENGTH(packet_event_map)) != 0) + { + logger(1, "%s", "Unknown packet_event_name"); + } + + if (daemon_event != NULL && + influxd_map_value_to_stat(sock, daemon_event, daemon_event_map, nDPIsrvd_ARRAY_LENGTH(daemon_event_map)) != 0) + { + logger(1, "%s", "Unknown daemon_event_name"); + } + + if (error_event != NULL && + influxd_map_value_to_stat(sock, error_event, error_event_map, nDPIsrvd_ARRAY_LENGTH(error_event_map)) != 0) + { + logger(1, "%s", "Unknown error_event_name"); + } pthread_mutex_unlock(&influxd_statistics.rw_lock); return CALLBACK_OK; |