aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2020-09-25 20:03:14 +0200
committerToni Uhlig <matzeton@googlemail.com>2020-09-25 20:43:23 +0200
commit6c0ac8b0457dd56d99f55ccc87e3c9495ee6f412 (patch)
tree828e1d642572718b38d92083cdb82af9277556b4
parent370ca7c00d23536f2d73f9a80994f24f35f7a877 (diff)
Added new flow event: FLOW_EVENT_DETECTION_UPDATE
* This event will be triggered when nDPI detection has some new information for us (hopefully). * Detection change is based on hashing with 32-bit murmur3 certain members of the ndpi flow struct. Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
-rw-r--r--config.h3
-rwxr-xr-xexamples/py-flow-info/flow-info.py4
-rw-r--r--nDPId.c161
3 files changed, 128 insertions, 40 deletions
diff --git a/config.h b/config.h
index 78d99cc78..8caa3d08e 100644
--- a/config.h
+++ b/config.h
@@ -18,8 +18,9 @@
#define nDPId_IDLE_SCAN_PERIOD 10000 /* 10 sec */
#define nDPId_IDLE_TIME 600000 /* 600 sec */
#define nDPId_POST_END_FLOW_TIME 60000 /* 60 sec */
-#define nDPId_INITIAL_THREAD_HASH 0x03dd018b
+#define nDPId_THREAD_DISTRIBUTION_SEED 0x03dd018b
#define nDPId_PACKETS_PER_FLOW_TO_SEND 15
+#define nDPId_FLOW_STRUCT_SEED 0x5defc104
/* nDPIsrvd default config options */
#define nDPIsrvd_PIDFILE "/tmp/ndpisrvd.pid"
diff --git a/examples/py-flow-info/flow-info.py b/examples/py-flow-info/flow-info.py
index 27ae2e626..77eb70c15 100755
--- a/examples/py-flow-info/flow-info.py
+++ b/examples/py-flow-info/flow-info.py
@@ -22,8 +22,8 @@ def parse_json_str(json_str):
event_str = 'Idle flow'
elif event == 'detected':
event_str = 'Detected'
- elif event == 'detected-extra':
- event_str = 'Detected Extra'
+ elif event == 'detection-update':
+ event_str = 'Update'
elif event == 'guessed':
event_str = 'Guessed'
elif event == 'not-detected':
diff --git a/nDPId.c b/nDPId.c
index 99e12d5ca..b3bf7c5dc 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -64,6 +64,7 @@ struct nDPId_flow_info
uint8_t reserved_00 : 5;
uint8_t reserved_01[3];
uint8_t l4_protocol;
+ uint32_t last_ndpi_flow_struct_hash;
struct ndpi_proto detected_l7_protocol;
struct ndpi_proto guessed_protocol;
@@ -132,7 +133,7 @@ enum flow_event
FLOW_EVENT_GUESSED,
FLOW_EVENT_DETECTED,
- FLOW_EVENT_DETECTED_EXTRA,
+ FLOW_EVENT_DETECTION_UPDATE,
FLOW_EVENT_NOT_DETECTED,
FLOW_EVENT_COUNT
@@ -173,7 +174,7 @@ static char const * const flow_event_name_table[FLOW_EVENT_COUNT] = {[FLOW_EVENT
[FLOW_EVENT_IDLE] = "idle",
[FLOW_EVENT_GUESSED] = "guessed",
[FLOW_EVENT_DETECTED] = "detected",
- [FLOW_EVENT_DETECTED_EXTRA] = "detected-extra",
+ [FLOW_EVENT_DETECTION_UPDATE] = "detection-update",
[FLOW_EVENT_NOT_DETECTED] = "not-detected"};
static char const * const basic_event_name_table[BASIC_EVENT_COUNT] = {
[BASIC_EVENT_INVALID] = "invalid",
@@ -270,9 +271,7 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device)
if (workflow->pcap_handle == NULL)
{
syslog(LOG_DAEMON | LOG_ERR,
- (pcap_argument_is_file == 0 ?
- "pcap_open_live: %.*s" :
- "pcap_open_offline_with_tstamp_precision: %.*s"),
+ (pcap_argument_is_file == 0 ? "pcap_open_live: %.*s" : "pcap_open_offline_with_tstamp_precision: %.*s"),
(int)PCAP_ERRBUF_SIZE,
pcap_error_buffer);
free_workflow(&workflow);
@@ -960,6 +959,7 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa
serialize_and_send(reader_thread);
}
+/* I decided against ndpi_flow2json as does not fulfill my needs. */
static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
struct nDPId_flow_info const * const flow,
enum flow_event event)
@@ -991,7 +991,7 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
case FLOW_EVENT_GUESSED:
case FLOW_EVENT_DETECTED:
- case FLOW_EVENT_DETECTED_EXTRA:
+ case FLOW_EVENT_DETECTION_UPDATE:
case FLOW_EVENT_NOT_DETECTED:
if (ndpi_dpi2json(workflow->ndpi_struct,
flow->ndpi_flow,
@@ -1185,6 +1185,76 @@ __attribute__((format(printf, 3, 4))) static void jsonize_basic_eventf(struct nD
serialize_and_send(reader_thread);
}
+/* See: https://en.wikipedia.org/wiki/MurmurHash#MurmurHash3 */
+static inline uint32_t murmur_32_scramble(uint32_t k)
+{
+ k *= 0xcc9e2d51;
+ k = (k << 15) | (k >> 17);
+ k *= 0x1b873593;
+ return k;
+}
+
+/* See: https://en.wikipedia.org/wiki/MurmurHash#MurmurHash3 */
+static uint32_t murmur3_32(uint8_t const * key, size_t len, uint32_t seed)
+{
+ uint32_t h = seed;
+ uint32_t k;
+ /* Read in groups of 4. */
+ for (size_t i = len >> 2; i; i--)
+ {
+ k = htole32(*(uint32_t *)key);
+ key += sizeof(uint32_t);
+ h ^= murmur_32_scramble(k);
+ h = (h << 13) | (h >> 19);
+ h = h * 5 + 0xe6546b64;
+ }
+ /* Read the rest. */
+ k = 0;
+ for (size_t i = len & 3; i; i--)
+ {
+ k <<= 8;
+ k |= key[i - 1];
+ }
+ // A swap is *not* necessary here because the preceding loop already
+ // places the low bytes in the low places according to whatever endianness
+ // we use. Swaps only apply when the memory is copied in a chunk.
+ h ^= murmur_32_scramble(k);
+ /* Finalize. */
+ h ^= len;
+ h ^= h >> 16;
+ h *= 0x85ebca6b;
+ h ^= h >> 13;
+ h *= 0xc2b2ae35;
+ h ^= h >> 16;
+ return h;
+}
+
+static uint32_t calculate_ndpi_flow_struct_hash(struct ndpi_flow_struct const * const ndpi_flow)
+{
+ /*
+ * This is a kludge, but necessary for now as I do not want to spam nDPIsrvd and clients
+ * with the same detection json string over and over again.
+ * So we are building a hash over the more "stable" parts of the ndpi flow struct.
+ * Stable in terms of they should only change if the detection changes for whatever reason.
+ * At the time of writing, nDPI has no API function to check if the detection changed
+ * or has some new information available. This is far from perfect.
+ */
+ uint32_t hash = murmur3_32((uint8_t const *)&ndpi_flow->protos,
+ sizeof(ndpi_flow->protos),
+ nDPId_FLOW_STRUCT_SEED);
+ hash += ndpi_flow->category;
+ hash += ndpi_flow->risk;
+
+ const size_t protocol_bitmask_size = sizeof(ndpi_flow->src->detected_protocol_bitmask.fds_bits) /
+ sizeof(ndpi_flow->src->detected_protocol_bitmask.fds_bits[0]);
+ for (size_t i = 0; i < protocol_bitmask_size; ++i)
+ {
+ hash += ndpi_flow->src->detected_protocol_bitmask.fds_bits[i];
+ hash += ndpi_flow->dst->detected_protocol_bitmask.fds_bits[i];
+ }
+ return hash;
+}
+
static void ndpi_process_packet(uint8_t * const args,
struct pcap_pkthdr const * const header,
uint8_t const * const packet)
@@ -1215,7 +1285,7 @@ static void ndpi_process_packet(uint8_t * const args,
uint16_t l4_len = 0;
uint16_t type;
- int thread_index = nDPId_INITIAL_THREAD_HASH; // generated with `dd if=/dev/random bs=1024 count=1 |& hd'
+ int thread_index = nDPId_THREAD_DISTRIBUTION_SEED; // generated with `dd if=/dev/random bs=1024 count=1 |& hd'
if (reader_thread == NULL)
{
@@ -1229,8 +1299,7 @@ static void ndpi_process_packet(uint8_t * const args,
}
workflow->packets_captured++;
- time_ms =
- ((uint64_t)header->ts.tv_sec) * tick_resolution + header->ts.tv_usec / (1000000 / tick_resolution);
+ time_ms = ((uint64_t)header->ts.tv_sec) * tick_resolution + header->ts.tv_usec / (1000000 / tick_resolution);
workflow->last_time = time_ms;
check_for_idle_flows(reader_thread);
@@ -1707,11 +1776,16 @@ static void ndpi_process_packet(uint8_t * const args,
flow_to_process->detection_completed = 1;
workflow->detected_flow_protocols++;
jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTED);
- } else if (flow_to_process->detection_completed == 1 &&
- flow_to_process->ndpi_flow->check_extra_packets)
+ flow_to_process->last_ndpi_flow_struct_hash = calculate_ndpi_flow_struct_hash(flow_to_process->ndpi_flow);
+ }
+ else if (flow_to_process->detection_completed == 1)
{
- /* TODO: Throw only FLOW_EVENT_DETECTED_EXTRA if the JSON string changes. */
- jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTED_EXTRA);
+ uint32_t hash = calculate_ndpi_flow_struct_hash(flow_to_process->ndpi_flow);
+ if (hash != flow_to_process->last_ndpi_flow_struct_hash)
+ {
+ jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTION_UPDATE);
+ flow_to_process->last_ndpi_flow_struct_hash = hash;
+ }
}
}
@@ -2049,8 +2123,10 @@ static int parse_options(int argc, char ** argv)
log_to_stderr = 1;
if (setvbuf(stderr, NULL, _IOLBF, 0) != 0)
{
- fprintf(stderr, "%s: Could not set stderr line-buffered, "
- "console syslog() messages may appear weird.\n", argv[0]);
+ fprintf(stderr,
+ "%s: Could not set stderr line-buffered, "
+ "console syslog() messages may appear weird.\n",
+ argv[0]);
}
break;
case 'c':
@@ -2080,7 +2156,8 @@ static int parse_options(int argc, char ** argv)
{
char * endptr;
int subopt = getsubopt(&subopts, subopt_token, &value);
- if (subopt == -1) {
+ if (subopt == -1)
+ {
fprintf(stderr, "Invalid subopt: %s\n\n", value);
fprintf(stderr, usage, argv[0]);
print_subopt_usage();
@@ -2088,14 +2165,17 @@ static int parse_options(int argc, char ** argv)
}
long int value_llu = strtoull(value, &endptr, 10);
- if (value == endptr) {
- fprintf(stderr, "Subopt `%s': Value `%s' is not a valid number.\n",
- subopt_token[subopt], value);
+ if (value == endptr)
+ {
+ fprintf(stderr,
+ "Subopt `%s': Value `%s' is not a valid number.\n",
+ subopt_token[subopt],
+ value);
return 1;
}
- if (errno == ERANGE) {
- fprintf(stderr, "Subopt `%s': Number too large.\n",
- subopt_token[subopt]);
+ if (errno == ERANGE)
+ {
+ fprintf(stderr, "Subopt `%s': Number too large.\n", subopt_token[subopt]);
return 1;
}
@@ -2127,10 +2207,8 @@ static int parse_options(int argc, char ** argv)
break;
}
- if (errno == ERANGE) {
- }
- if (value == endptr) {
- }
+ if (errno == ERANGE) {}
+ if (value == endptr) {}
}
break;
}
@@ -2141,7 +2219,8 @@ static int parse_options(int argc, char ** argv)
}
}
- if (optind < argc) {
+ if (optind < argc)
+ {
fprintf(stderr, "Unexpected argument after options\n\n");
fprintf(stderr, usage, argv[0]);
print_subopt_usage();
@@ -2155,31 +2234,38 @@ static int validate_options(char const * const arg0)
{
int retval = 0;
- if (max_flows_per_thread < 128 || max_flows_per_thread > nDPId_MAX_FLOWS_PER_THREAD) {
+ if (max_flows_per_thread < 128 || max_flows_per_thread > nDPId_MAX_FLOWS_PER_THREAD)
+ {
fprintf(stderr, "%s: 128 < max-flows-per-thread < %d\n", arg0, nDPId_MAX_FLOWS_PER_THREAD);
retval = 1;
}
- if (max_idle_flows_per_thread < 64 || max_idle_flows_per_thread > nDPId_MAX_IDLE_FLOWS_PER_THREAD) {
+ if (max_idle_flows_per_thread < 64 || max_idle_flows_per_thread > nDPId_MAX_IDLE_FLOWS_PER_THREAD)
+ {
fprintf(stderr, "%s: 64 < max-idle-flows-per-thread < %d\n", arg0, nDPId_MAX_IDLE_FLOWS_PER_THREAD);
retval = 1;
}
- if (tick_resolution < 100) {
+ if (tick_resolution < 100)
+ {
fprintf(stderr, "%s: tick-resolution > 100\n", arg0);
retval = 1;
}
- if (reader_thread_count < 1 || reader_thread_count > nDPId_MAX_READER_THREADS) {
+ if (reader_thread_count < 1 || reader_thread_count > nDPId_MAX_READER_THREADS)
+ {
fprintf(stderr, "%s: 1 < reader-thread-count < %d\n", arg0, nDPId_MAX_READER_THREADS);
retval = 1;
}
- if (idle_scan_period < 1000) {
+ if (idle_scan_period < 1000)
+ {
fprintf(stderr, "%s: idle-scan-period > 1000\n", arg0);
retval = 1;
}
- if (max_idle_time < 60) {
+ if (max_idle_time < 60)
+ {
fprintf(stderr, "%s: max-idle-time > 60\n", arg0);
retval = 1;
}
- if (max_post_end_flow_time > max_idle_time) {
+ if (max_post_end_flow_time > max_idle_time)
+ {
fprintf(stderr, "%s: max-post-end-flow-time < max_idle_time\n", arg0);
retval = 1;
}
@@ -2215,9 +2301,10 @@ int main(int argc, char ** argv)
pcap_lib_version() + strlen("libpcap version "));
if (ndpi_get_gcrypt_version() != NULL)
{
- printf("gcrypt version: %s\n"
- "----------------------------------\n",
- ndpi_get_gcrypt_version());
+ printf(
+ "gcrypt version: %s\n"
+ "----------------------------------\n",
+ ndpi_get_gcrypt_version());
}
openlog("nDPId", LOG_CONS | LOG_PERROR, LOG_DAEMON);