diff options
-rw-r--r-- | src/include/ndpi_typedefs.h | 10 | ||||
-rw-r--r-- | src/lib/protocols/kafka.c | 38 | ||||
-rw-r--r-- | tests/cfgs/default/result/kafka.pcapng.out | 6 |
3 files changed, 41 insertions, 13 deletions
diff --git a/src/include/ndpi_typedefs.h b/src/include/ndpi_typedefs.h index 909b98bb2..58c9d7df5 100644 --- a/src/include/ndpi_typedefs.h +++ b/src/include/ndpi_typedefs.h @@ -838,6 +838,9 @@ struct ndpi_flow_tcp_struct { /* NDPI_PROTOCOL_SSH */ u_int32_t ssh_stage:3; + /* NDPI_PROTOCOL_KAFKA */ + u_int32_t kafka_stage:1; + /* NDPI_PROTOCOL_VNC */ u_int32_t vnc_stage:2; // 0 - 3 @@ -891,6 +894,9 @@ struct ndpi_flow_tcp_struct { /* NDPI_PROTOCOL_RADMIN */ u_int32_t radmin_stage:1; + + /* NDPI_PROTOCOL_KAFKA */ + u_int32_t kafka_correlation_id; }; /* ************************************************** */ @@ -1507,8 +1513,8 @@ struct ndpi_flow_struct { _Static_assert(sizeof(((struct ndpi_flow_struct *)0)->protos) <= 256, "Size of the struct member protocols increased to more than 256 bytes, " "please check if this change is necessary."); -_Static_assert(sizeof(struct ndpi_flow_struct) <= 1016, - "Size of the flow struct increased to more than 1016 bytes, " +_Static_assert(sizeof(struct ndpi_flow_struct) <= 1024, + "Size of the flow struct increased to more than 1024 bytes, " "please check if this change is necessary."); #endif #endif diff --git a/src/lib/protocols/kafka.c b/src/lib/protocols/kafka.c index 09c9b9c0c..1595b43be 100644 --- a/src/lib/protocols/kafka.c +++ b/src/lib/protocols/kafka.c @@ -36,15 +36,37 @@ static void ndpi_search_kafka(struct ndpi_detection_module_struct *ndpi_struct, NDPI_LOG_DBG(ndpi_struct, "search Apache Kafka\n"); - if (current_pkt_from_client_to_server(ndpi_struct, flow) && - packet->payload_packet_len > 40 && - ntohl(get_u_int32_t(packet->payload, 0)) == (u_int32_t)(packet->payload_packet_len-4) && - ntohs(get_u_int16_t(packet->payload, 4)) < 69) + /* All Kafka stuff start with 4 bytes containing the payload length + * minus 4 bytes. + * API keys: https://kafka.apache.org/protocol.html#protocol_api_keys + * API versions: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+APIs + */ + if (packet->payload_packet_len > 40 && + ntohl(get_u_int32_t(packet->payload, 0)) == (u_int32_t)(packet->payload_packet_len-4)) { - NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n"); - ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA, - NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI); - return; + /* Request */ + if (!flow->l4.tcp.kafka_stage && + current_pkt_from_client_to_server(ndpi_struct, flow) && + ntohs(get_u_int16_t(packet->payload, 4)) < 75 && /* API key */ + ntohs(get_u_int16_t(packet->payload, 6)) < 16 /* API version */) + { + flow->l4.tcp.kafka_correlation_id = ntohl(get_u_int16_t(packet->payload, 8)); + flow->l4.tcp.kafka_stage = 1; + return; + } + + /* Response */ + if (flow->l4.tcp.kafka_stage == 1 && + current_pkt_from_server_to_client(ndpi_struct, flow)) + { + if (ntohl(get_u_int16_t(packet->payload, 4)) == flow->l4.tcp.kafka_correlation_id) + { + NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n"); + ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA, + NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI); + return; + } + } } NDPI_EXCLUDE_PROTO(ndpi_struct, flow); diff --git a/tests/cfgs/default/result/kafka.pcapng.out b/tests/cfgs/default/result/kafka.pcapng.out index 3721dec1d..b106e7959 100644 --- a/tests/cfgs/default/result/kafka.pcapng.out +++ b/tests/cfgs/default/result/kafka.pcapng.out @@ -1,6 +1,6 @@ -DPI Packets (TCP): 4 (4.00 pkts/flow) +DPI Packets (TCP): 6 (6.00 pkts/flow) Confidence DPI : 1 (flows) -Num dissector calls: 1 (1.00 diss/flow) +Num dissector calls: 148 (148.00 diss/flow) LRU cache ookla: 0/0/0 (insert/search/found) LRU cache bittorrent: 0/0/0 (insert/search/found) LRU cache zoom: 0/0/0 (insert/search/found) @@ -25,4 +25,4 @@ Kafka 19 2237 1 Acceptable 19 2237 1 - 1 TCP 127.0.0.1:46136 <-> 127.0.0.1:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 4][cat: RPC/16][12 pkts/1107 bytes <-> 7 pkts/1130 bytes][Goodput ratio: 28/58][13.63 sec][bytes ratio: -0.010 (Mixed)][IAT c2s/s2c min/avg/max/stddev: 0/0 800/288 6849/1049 2039/441][Pkt Len c2s/s2c min/avg/max/stddev: 66/66 92/161 206/512 42/149][PLAIN TEXT (console)][Plen Bins: 12,38,12,12,12,0,0,0,0,0,0,0,0,12,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + 1 TCP 127.0.0.1:46136 <-> 127.0.0.1:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 6][cat: RPC/16][12 pkts/1107 bytes <-> 7 pkts/1130 bytes][Goodput ratio: 28/58][13.63 sec][bytes ratio: -0.010 (Mixed)][IAT c2s/s2c min/avg/max/stddev: 0/0 800/288 6849/1049 2039/441][Pkt Len c2s/s2c min/avg/max/stddev: 66/66 92/161 206/512 42/149][PLAIN TEXT (console)][Plen Bins: 12,38,12,12,12,0,0,0,0,0,0,0,0,12,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] |