aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/include/ndpi_typedefs.h10
-rw-r--r--src/lib/protocols/kafka.c38
-rw-r--r--tests/cfgs/default/result/kafka.pcapng.out6
3 files changed, 41 insertions, 13 deletions
diff --git a/src/include/ndpi_typedefs.h b/src/include/ndpi_typedefs.h
index 909b98bb2..58c9d7df5 100644
--- a/src/include/ndpi_typedefs.h
+++ b/src/include/ndpi_typedefs.h
@@ -838,6 +838,9 @@ struct ndpi_flow_tcp_struct {
/* NDPI_PROTOCOL_SSH */
u_int32_t ssh_stage:3;
+ /* NDPI_PROTOCOL_KAFKA */
+ u_int32_t kafka_stage:1;
+
/* NDPI_PROTOCOL_VNC */
u_int32_t vnc_stage:2; // 0 - 3
@@ -891,6 +894,9 @@ struct ndpi_flow_tcp_struct {
/* NDPI_PROTOCOL_RADMIN */
u_int32_t radmin_stage:1;
+
+ /* NDPI_PROTOCOL_KAFKA */
+ u_int32_t kafka_correlation_id;
};
/* ************************************************** */
@@ -1507,8 +1513,8 @@ struct ndpi_flow_struct {
_Static_assert(sizeof(((struct ndpi_flow_struct *)0)->protos) <= 256,
"Size of the struct member protocols increased to more than 256 bytes, "
"please check if this change is necessary.");
-_Static_assert(sizeof(struct ndpi_flow_struct) <= 1016,
- "Size of the flow struct increased to more than 1016 bytes, "
+_Static_assert(sizeof(struct ndpi_flow_struct) <= 1024,
+ "Size of the flow struct increased to more than 1024 bytes, "
"please check if this change is necessary.");
#endif
#endif
diff --git a/src/lib/protocols/kafka.c b/src/lib/protocols/kafka.c
index 09c9b9c0c..1595b43be 100644
--- a/src/lib/protocols/kafka.c
+++ b/src/lib/protocols/kafka.c
@@ -36,15 +36,37 @@ static void ndpi_search_kafka(struct ndpi_detection_module_struct *ndpi_struct,
NDPI_LOG_DBG(ndpi_struct, "search Apache Kafka\n");
- if (current_pkt_from_client_to_server(ndpi_struct, flow) &&
- packet->payload_packet_len > 40 &&
- ntohl(get_u_int32_t(packet->payload, 0)) == (u_int32_t)(packet->payload_packet_len-4) &&
- ntohs(get_u_int16_t(packet->payload, 4)) < 69)
+ /* All Kafka stuff start with 4 bytes containing the payload length
+ * minus 4 bytes.
+ * API keys: https://kafka.apache.org/protocol.html#protocol_api_keys
+ * API versions: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+APIs
+ */
+ if (packet->payload_packet_len > 40 &&
+ ntohl(get_u_int32_t(packet->payload, 0)) == (u_int32_t)(packet->payload_packet_len-4))
{
- NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n");
- ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA,
- NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI);
- return;
+ /* Request */
+ if (!flow->l4.tcp.kafka_stage &&
+ current_pkt_from_client_to_server(ndpi_struct, flow) &&
+ ntohs(get_u_int16_t(packet->payload, 4)) < 75 && /* API key */
+ ntohs(get_u_int16_t(packet->payload, 6)) < 16 /* API version */)
+ {
+ flow->l4.tcp.kafka_correlation_id = ntohl(get_u_int16_t(packet->payload, 8));
+ flow->l4.tcp.kafka_stage = 1;
+ return;
+ }
+
+ /* Response */
+ if (flow->l4.tcp.kafka_stage == 1 &&
+ current_pkt_from_server_to_client(ndpi_struct, flow))
+ {
+ if (ntohl(get_u_int16_t(packet->payload, 4)) == flow->l4.tcp.kafka_correlation_id)
+ {
+ NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n");
+ ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA,
+ NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI);
+ return;
+ }
+ }
}
NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
diff --git a/tests/cfgs/default/result/kafka.pcapng.out b/tests/cfgs/default/result/kafka.pcapng.out
index 3721dec1d..b106e7959 100644
--- a/tests/cfgs/default/result/kafka.pcapng.out
+++ b/tests/cfgs/default/result/kafka.pcapng.out
@@ -1,6 +1,6 @@
-DPI Packets (TCP): 4 (4.00 pkts/flow)
+DPI Packets (TCP): 6 (6.00 pkts/flow)
Confidence DPI : 1 (flows)
-Num dissector calls: 1 (1.00 diss/flow)
+Num dissector calls: 148 (148.00 diss/flow)
LRU cache ookla: 0/0/0 (insert/search/found)
LRU cache bittorrent: 0/0/0 (insert/search/found)
LRU cache zoom: 0/0/0 (insert/search/found)
@@ -25,4 +25,4 @@ Kafka 19 2237 1
Acceptable 19 2237 1
- 1 TCP 127.0.0.1:46136 <-> 127.0.0.1:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 4][cat: RPC/16][12 pkts/1107 bytes <-> 7 pkts/1130 bytes][Goodput ratio: 28/58][13.63 sec][bytes ratio: -0.010 (Mixed)][IAT c2s/s2c min/avg/max/stddev: 0/0 800/288 6849/1049 2039/441][Pkt Len c2s/s2c min/avg/max/stddev: 66/66 92/161 206/512 42/149][PLAIN TEXT (console)][Plen Bins: 12,38,12,12,12,0,0,0,0,0,0,0,0,12,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
+ 1 TCP 127.0.0.1:46136 <-> 127.0.0.1:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 6][cat: RPC/16][12 pkts/1107 bytes <-> 7 pkts/1130 bytes][Goodput ratio: 28/58][13.63 sec][bytes ratio: -0.010 (Mixed)][IAT c2s/s2c min/avg/max/stddev: 0/0 800/288 6849/1049 2039/441][Pkt Len c2s/s2c min/avg/max/stddev: 66/66 92/161 206/512 42/149][PLAIN TEXT (console)][Plen Bins: 12,38,12,12,12,0,0,0,0,0,0,0,0,12,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]