From abce6d4023c2b76f23e75deff446121687abe5d9 Mon Sep 17 00:00:00 2001 From: Toni Date: Mon, 27 May 2024 09:46:32 +0200 Subject: Improved Kafka dissector. (#2456) * detect more Kafka request packet's * requires less flow memory * same detection behavior as before e.g. no asym detection implemented (can be done by dissecting responses, requires more effort) Signed-off-by: Toni Uhlig Co-authored-by: Nardi Ivan --- src/lib/protocols/kafka.c | 51 +++++++++++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 19 deletions(-) (limited to 'src/lib/protocols/kafka.c') diff --git a/src/lib/protocols/kafka.c b/src/lib/protocols/kafka.c index cffd1f32f..abf0ae3ca 100644 --- a/src/lib/protocols/kafka.c +++ b/src/lib/protocols/kafka.c @@ -29,6 +29,14 @@ #include "ndpi_api.h" #include "ndpi_private.h" +static void ndpi_int_kafka_add_connection(struct ndpi_detection_module_struct *ndpi_struct, + struct ndpi_flow_struct *flow) +{ + NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n"); + ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA, + NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI); +} + static void ndpi_search_kafka(struct ndpi_detection_module_struct *ndpi_struct, struct ndpi_flow_struct *flow) { @@ -41,32 +49,37 @@ static void ndpi_search_kafka(struct ndpi_detection_module_struct *ndpi_struct, * API keys: https://kafka.apache.org/protocol.html#protocol_api_keys * API versions: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+APIs */ - if (packet->payload_packet_len > 40 && - ntohl(get_u_int32_t(packet->payload, 0)) == (u_int32_t)(packet->payload_packet_len-4)) + if (packet->payload_packet_len < 8 /* min. required packet length */ || + ntohl(get_u_int32_t(packet->payload, 0)) != (uint32_t)(packet->payload_packet_len - 4)) + { + NDPI_EXCLUDE_PROTO(ndpi_struct, flow); + return; + } + + /* Request */ + if (ntohs(get_u_int16_t(packet->payload, 4)) < 75 && /* API key */ + ntohs(get_u_int16_t(packet->payload, 6)) < 16 /* API version */) { - /* Request */ - if (!flow->l4.tcp.kafka_stage && - current_pkt_from_client_to_server(ndpi_struct, flow) && - ntohs(get_u_int16_t(packet->payload, 4)) < 75 && /* API key */ - ntohs(get_u_int16_t(packet->payload, 6)) < 16 /* API version */) + if (packet->payload_packet_len < 14) { - flow->l4.tcp.kafka_correlation_id = ntohl(get_u_int16_t(packet->payload, 8)); - flow->l4.tcp.kafka_stage = 1; - return; + NDPI_EXCLUDE_PROTO(ndpi_struct, flow); + return; } - /* Response */ - if (flow->l4.tcp.kafka_stage == 1 && - current_pkt_from_server_to_client(ndpi_struct, flow)) + const uint16_t client_id_len = ntohs(get_u_int16_t(packet->payload, 12)); + if (client_id_len + 12 + 2 > packet->payload_packet_len) { - if (ntohl(get_u_int16_t(packet->payload, 4)) == flow->l4.tcp.kafka_correlation_id) - { - NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n"); - ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA, - NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI); + NDPI_EXCLUDE_PROTO(ndpi_struct, flow); return; - } } + if (ndpi_is_printable_buffer(&packet->payload[14], client_id_len) == 0) + { + NDPI_EXCLUDE_PROTO(ndpi_struct, flow); + return; + } + + ndpi_int_kafka_add_connection(ndpi_struct, flow); + return; } NDPI_EXCLUDE_PROTO(ndpi_struct, flow); -- cgit v1.2.3