diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/include/ndpi_typedefs.h | 6 | ||||
-rw-r--r-- | src/lib/protocols/kafka.c | 51 |
2 files changed, 32 insertions, 25 deletions
diff --git a/src/include/ndpi_typedefs.h b/src/include/ndpi_typedefs.h index ffc98ecd1..a2e10878f 100644 --- a/src/include/ndpi_typedefs.h +++ b/src/include/ndpi_typedefs.h @@ -835,9 +835,6 @@ struct ndpi_flow_tcp_struct { /* NDPI_PROTOCOL_SSH */ u_int32_t ssh_stage:3; - /* NDPI_PROTOCOL_KAFKA */ - u_int32_t kafka_stage:1; - /* NDPI_PROTOCOL_VNC */ u_int32_t vnc_stage:2; // 0 - 3 @@ -891,9 +888,6 @@ struct ndpi_flow_tcp_struct { /* NDPI_PROTOCOL_RADMIN */ u_int32_t radmin_stage:1; - - /* NDPI_PROTOCOL_KAFKA */ - u_int32_t kafka_correlation_id; }; /* ************************************************** */ diff --git a/src/lib/protocols/kafka.c b/src/lib/protocols/kafka.c index cffd1f32f..abf0ae3ca 100644 --- a/src/lib/protocols/kafka.c +++ b/src/lib/protocols/kafka.c @@ -29,6 +29,14 @@ #include "ndpi_api.h" #include "ndpi_private.h" +static void ndpi_int_kafka_add_connection(struct ndpi_detection_module_struct *ndpi_struct, + struct ndpi_flow_struct *flow) +{ + NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n"); + ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA, + NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI); +} + static void ndpi_search_kafka(struct ndpi_detection_module_struct *ndpi_struct, struct ndpi_flow_struct *flow) { @@ -41,32 +49,37 @@ static void ndpi_search_kafka(struct ndpi_detection_module_struct *ndpi_struct, * API keys: https://kafka.apache.org/protocol.html#protocol_api_keys * API versions: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+APIs */ - if (packet->payload_packet_len > 40 && - ntohl(get_u_int32_t(packet->payload, 0)) == (u_int32_t)(packet->payload_packet_len-4)) + if (packet->payload_packet_len < 8 /* min. required packet length */ || + ntohl(get_u_int32_t(packet->payload, 0)) != (uint32_t)(packet->payload_packet_len - 4)) + { + NDPI_EXCLUDE_PROTO(ndpi_struct, flow); + return; + } + + /* Request */ + if (ntohs(get_u_int16_t(packet->payload, 4)) < 75 && /* API key */ + ntohs(get_u_int16_t(packet->payload, 6)) < 16 /* API version */) { - /* Request */ - if (!flow->l4.tcp.kafka_stage && - current_pkt_from_client_to_server(ndpi_struct, flow) && - ntohs(get_u_int16_t(packet->payload, 4)) < 75 && /* API key */ - ntohs(get_u_int16_t(packet->payload, 6)) < 16 /* API version */) + if (packet->payload_packet_len < 14) { - flow->l4.tcp.kafka_correlation_id = ntohl(get_u_int16_t(packet->payload, 8)); - flow->l4.tcp.kafka_stage = 1; - return; + NDPI_EXCLUDE_PROTO(ndpi_struct, flow); + return; } - /* Response */ - if (flow->l4.tcp.kafka_stage == 1 && - current_pkt_from_server_to_client(ndpi_struct, flow)) + const uint16_t client_id_len = ntohs(get_u_int16_t(packet->payload, 12)); + if (client_id_len + 12 + 2 > packet->payload_packet_len) { - if (ntohl(get_u_int16_t(packet->payload, 4)) == flow->l4.tcp.kafka_correlation_id) - { - NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n"); - ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA, - NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI); + NDPI_EXCLUDE_PROTO(ndpi_struct, flow); return; - } } + if (ndpi_is_printable_buffer(&packet->payload[14], client_id_len) == 0) + { + NDPI_EXCLUDE_PROTO(ndpi_struct, flow); + return; + } + + ndpi_int_kafka_add_connection(ndpi_struct, flow); + return; } NDPI_EXCLUDE_PROTO(ndpi_struct, flow); |