diff options
Diffstat (limited to 'src/lib/protocols/kafka.c')
-rw-r--r-- | src/lib/protocols/kafka.c | 51 |
1 files changed, 32 insertions, 19 deletions
diff --git a/src/lib/protocols/kafka.c b/src/lib/protocols/kafka.c index cffd1f32f..abf0ae3ca 100644 --- a/src/lib/protocols/kafka.c +++ b/src/lib/protocols/kafka.c @@ -29,6 +29,14 @@ #include "ndpi_api.h" #include "ndpi_private.h" +static void ndpi_int_kafka_add_connection(struct ndpi_detection_module_struct *ndpi_struct, + struct ndpi_flow_struct *flow) +{ + NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n"); + ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA, + NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI); +} + static void ndpi_search_kafka(struct ndpi_detection_module_struct *ndpi_struct, struct ndpi_flow_struct *flow) { @@ -41,32 +49,37 @@ static void ndpi_search_kafka(struct ndpi_detection_module_struct *ndpi_struct, * API keys: https://kafka.apache.org/protocol.html#protocol_api_keys * API versions: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+APIs */ - if (packet->payload_packet_len > 40 && - ntohl(get_u_int32_t(packet->payload, 0)) == (u_int32_t)(packet->payload_packet_len-4)) + if (packet->payload_packet_len < 8 /* min. required packet length */ || + ntohl(get_u_int32_t(packet->payload, 0)) != (uint32_t)(packet->payload_packet_len - 4)) + { + NDPI_EXCLUDE_PROTO(ndpi_struct, flow); + return; + } + + /* Request */ + if (ntohs(get_u_int16_t(packet->payload, 4)) < 75 && /* API key */ + ntohs(get_u_int16_t(packet->payload, 6)) < 16 /* API version */) { - /* Request */ - if (!flow->l4.tcp.kafka_stage && - current_pkt_from_client_to_server(ndpi_struct, flow) && - ntohs(get_u_int16_t(packet->payload, 4)) < 75 && /* API key */ - ntohs(get_u_int16_t(packet->payload, 6)) < 16 /* API version */) + if (packet->payload_packet_len < 14) { - flow->l4.tcp.kafka_correlation_id = ntohl(get_u_int16_t(packet->payload, 8)); - flow->l4.tcp.kafka_stage = 1; - return; + NDPI_EXCLUDE_PROTO(ndpi_struct, flow); + return; } - /* Response */ - if (flow->l4.tcp.kafka_stage == 1 && - current_pkt_from_server_to_client(ndpi_struct, flow)) + const uint16_t client_id_len = ntohs(get_u_int16_t(packet->payload, 12)); + if (client_id_len + 12 + 2 > packet->payload_packet_len) { - if (ntohl(get_u_int16_t(packet->payload, 4)) == flow->l4.tcp.kafka_correlation_id) - { - NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n"); - ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA, - NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI); + NDPI_EXCLUDE_PROTO(ndpi_struct, flow); return; - } } + if (ndpi_is_printable_buffer(&packet->payload[14], client_id_len) == 0) + { + NDPI_EXCLUDE_PROTO(ndpi_struct, flow); + return; + } + + ndpi_int_kafka_add_connection(ndpi_struct, flow); + return; } NDPI_EXCLUDE_PROTO(ndpi_struct, flow); |