aboutsummaryrefslogtreecommitdiff
path: root/src/lib/protocols/kafka.c
diff options
context:
space:
mode:
authorVladimir Gavrilov <105977161+0xA50C1A1@users.noreply.github.com>2024-04-06 17:59:38 +0300
committerGitHub <noreply@github.com>2024-04-06 16:59:38 +0200
commitb535033674b1cd6d2d6397bc7c7d803521791bf3 (patch)
tree4c3615dad74c9789cccff1229ec7c0fe2aece9a0 /src/lib/protocols/kafka.c
parent700637a162ba0d3350cff0ff16331ffaa6c2f841 (diff)
Get rid of Apache Kafka false positives (#2372)
Diffstat (limited to 'src/lib/protocols/kafka.c')
-rw-r--r--src/lib/protocols/kafka.c38
1 files changed, 30 insertions, 8 deletions
diff --git a/src/lib/protocols/kafka.c b/src/lib/protocols/kafka.c
index 09c9b9c0c..1595b43be 100644
--- a/src/lib/protocols/kafka.c
+++ b/src/lib/protocols/kafka.c
@@ -36,15 +36,37 @@ static void ndpi_search_kafka(struct ndpi_detection_module_struct *ndpi_struct,
NDPI_LOG_DBG(ndpi_struct, "search Apache Kafka\n");
- if (current_pkt_from_client_to_server(ndpi_struct, flow) &&
- packet->payload_packet_len > 40 &&
- ntohl(get_u_int32_t(packet->payload, 0)) == (u_int32_t)(packet->payload_packet_len-4) &&
- ntohs(get_u_int16_t(packet->payload, 4)) < 69)
+ /* All Kafka stuff start with 4 bytes containing the payload length
+ * minus 4 bytes.
+ * API keys: https://kafka.apache.org/protocol.html#protocol_api_keys
+ * API versions: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+APIs
+ */
+ if (packet->payload_packet_len > 40 &&
+ ntohl(get_u_int32_t(packet->payload, 0)) == (u_int32_t)(packet->payload_packet_len-4))
{
- NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n");
- ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA,
- NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI);
- return;
+ /* Request */
+ if (!flow->l4.tcp.kafka_stage &&
+ current_pkt_from_client_to_server(ndpi_struct, flow) &&
+ ntohs(get_u_int16_t(packet->payload, 4)) < 75 && /* API key */
+ ntohs(get_u_int16_t(packet->payload, 6)) < 16 /* API version */)
+ {
+ flow->l4.tcp.kafka_correlation_id = ntohl(get_u_int16_t(packet->payload, 8));
+ flow->l4.tcp.kafka_stage = 1;
+ return;
+ }
+
+ /* Response */
+ if (flow->l4.tcp.kafka_stage == 1 &&
+ current_pkt_from_server_to_client(ndpi_struct, flow))
+ {
+ if (ntohl(get_u_int16_t(packet->payload, 4)) == flow->l4.tcp.kafka_correlation_id)
+ {
+ NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n");
+ ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA,
+ NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI);
+ return;
+ }
+ }
}
NDPI_EXCLUDE_PROTO(ndpi_struct, flow);