aboutsummaryrefslogtreecommitdiff
path: root/src/lib/protocols/kafka.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/protocols/kafka.c')
-rw-r--r--src/lib/protocols/kafka.c51
1 files changed, 32 insertions, 19 deletions
diff --git a/src/lib/protocols/kafka.c b/src/lib/protocols/kafka.c
index cffd1f32f..abf0ae3ca 100644
--- a/src/lib/protocols/kafka.c
+++ b/src/lib/protocols/kafka.c
@@ -29,6 +29,14 @@
#include "ndpi_api.h"
#include "ndpi_private.h"
+static void ndpi_int_kafka_add_connection(struct ndpi_detection_module_struct *ndpi_struct,
+ struct ndpi_flow_struct *flow)
+{
+ NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n");
+ ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA,
+ NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI);
+}
+
static void ndpi_search_kafka(struct ndpi_detection_module_struct *ndpi_struct,
struct ndpi_flow_struct *flow)
{
@@ -41,32 +49,37 @@ static void ndpi_search_kafka(struct ndpi_detection_module_struct *ndpi_struct,
* API keys: https://kafka.apache.org/protocol.html#protocol_api_keys
* API versions: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+APIs
*/
- if (packet->payload_packet_len > 40 &&
- ntohl(get_u_int32_t(packet->payload, 0)) == (u_int32_t)(packet->payload_packet_len-4))
+ if (packet->payload_packet_len < 8 /* min. required packet length */ ||
+ ntohl(get_u_int32_t(packet->payload, 0)) != (uint32_t)(packet->payload_packet_len - 4))
+ {
+ NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
+ return;
+ }
+
+ /* Request */
+ if (ntohs(get_u_int16_t(packet->payload, 4)) < 75 && /* API key */
+ ntohs(get_u_int16_t(packet->payload, 6)) < 16 /* API version */)
{
- /* Request */
- if (!flow->l4.tcp.kafka_stage &&
- current_pkt_from_client_to_server(ndpi_struct, flow) &&
- ntohs(get_u_int16_t(packet->payload, 4)) < 75 && /* API key */
- ntohs(get_u_int16_t(packet->payload, 6)) < 16 /* API version */)
+ if (packet->payload_packet_len < 14)
{
- flow->l4.tcp.kafka_correlation_id = ntohl(get_u_int16_t(packet->payload, 8));
- flow->l4.tcp.kafka_stage = 1;
- return;
+ NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
+ return;
}
- /* Response */
- if (flow->l4.tcp.kafka_stage == 1 &&
- current_pkt_from_server_to_client(ndpi_struct, flow))
+ const uint16_t client_id_len = ntohs(get_u_int16_t(packet->payload, 12));
+ if (client_id_len + 12 + 2 > packet->payload_packet_len)
{
- if (ntohl(get_u_int16_t(packet->payload, 4)) == flow->l4.tcp.kafka_correlation_id)
- {
- NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n");
- ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA,
- NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI);
+ NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
return;
- }
}
+ if (ndpi_is_printable_buffer(&packet->payload[14], client_id_len) == 0)
+ {
+ NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
+ return;
+ }
+
+ ndpi_int_kafka_add_connection(ndpi_struct, flow);
+ return;
}
NDPI_EXCLUDE_PROTO(ndpi_struct, flow);