diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2020-09-25 20:03:14 +0200 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2020-09-25 20:43:23 +0200 |
commit | 6c0ac8b0457dd56d99f55ccc87e3c9495ee6f412 (patch) | |
tree | 828e1d642572718b38d92083cdb82af9277556b4 | |
parent | 370ca7c00d23536f2d73f9a80994f24f35f7a877 (diff) |
Added new flow event: FLOW_EVENT_DETECTION_UPDATE
* This event will be triggered when nDPI detection has some new information for us (hopefully).
* Detection change is based on hashing with 32-bit murmur3 certain members of the ndpi flow struct.
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
-rw-r--r-- | config.h | 3 | ||||
-rwxr-xr-x | examples/py-flow-info/flow-info.py | 4 | ||||
-rw-r--r-- | nDPId.c | 161 |
3 files changed, 128 insertions, 40 deletions
@@ -18,8 +18,9 @@ #define nDPId_IDLE_SCAN_PERIOD 10000 /* 10 sec */ #define nDPId_IDLE_TIME 600000 /* 600 sec */ #define nDPId_POST_END_FLOW_TIME 60000 /* 60 sec */ -#define nDPId_INITIAL_THREAD_HASH 0x03dd018b +#define nDPId_THREAD_DISTRIBUTION_SEED 0x03dd018b #define nDPId_PACKETS_PER_FLOW_TO_SEND 15 +#define nDPId_FLOW_STRUCT_SEED 0x5defc104 /* nDPIsrvd default config options */ #define nDPIsrvd_PIDFILE "/tmp/ndpisrvd.pid" diff --git a/examples/py-flow-info/flow-info.py b/examples/py-flow-info/flow-info.py index 27ae2e626..77eb70c15 100755 --- a/examples/py-flow-info/flow-info.py +++ b/examples/py-flow-info/flow-info.py @@ -22,8 +22,8 @@ def parse_json_str(json_str): event_str = 'Idle flow' elif event == 'detected': event_str = 'Detected' - elif event == 'detected-extra': - event_str = 'Detected Extra' + elif event == 'detection-update': + event_str = 'Update' elif event == 'guessed': event_str = 'Guessed' elif event == 'not-detected': @@ -64,6 +64,7 @@ struct nDPId_flow_info uint8_t reserved_00 : 5; uint8_t reserved_01[3]; uint8_t l4_protocol; + uint32_t last_ndpi_flow_struct_hash; struct ndpi_proto detected_l7_protocol; struct ndpi_proto guessed_protocol; @@ -132,7 +133,7 @@ enum flow_event FLOW_EVENT_GUESSED, FLOW_EVENT_DETECTED, - FLOW_EVENT_DETECTED_EXTRA, + FLOW_EVENT_DETECTION_UPDATE, FLOW_EVENT_NOT_DETECTED, FLOW_EVENT_COUNT @@ -173,7 +174,7 @@ static char const * const flow_event_name_table[FLOW_EVENT_COUNT] = {[FLOW_EVENT [FLOW_EVENT_IDLE] = "idle", [FLOW_EVENT_GUESSED] = "guessed", [FLOW_EVENT_DETECTED] = "detected", - [FLOW_EVENT_DETECTED_EXTRA] = "detected-extra", + [FLOW_EVENT_DETECTION_UPDATE] = "detection-update", [FLOW_EVENT_NOT_DETECTED] = "not-detected"}; static char const * const basic_event_name_table[BASIC_EVENT_COUNT] = { [BASIC_EVENT_INVALID] = "invalid", @@ -270,9 +271,7 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device) if (workflow->pcap_handle == NULL) { syslog(LOG_DAEMON | LOG_ERR, - (pcap_argument_is_file == 0 ? - "pcap_open_live: %.*s" : - "pcap_open_offline_with_tstamp_precision: %.*s"), + (pcap_argument_is_file == 0 ? "pcap_open_live: %.*s" : "pcap_open_offline_with_tstamp_precision: %.*s"), (int)PCAP_ERRBUF_SIZE, pcap_error_buffer); free_workflow(&workflow); @@ -960,6 +959,7 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa serialize_and_send(reader_thread); } +/* I decided against ndpi_flow2json as does not fulfill my needs. */ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, struct nDPId_flow_info const * const flow, enum flow_event event) @@ -991,7 +991,7 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, case FLOW_EVENT_GUESSED: case FLOW_EVENT_DETECTED: - case FLOW_EVENT_DETECTED_EXTRA: + case FLOW_EVENT_DETECTION_UPDATE: case FLOW_EVENT_NOT_DETECTED: if (ndpi_dpi2json(workflow->ndpi_struct, flow->ndpi_flow, @@ -1185,6 +1185,76 @@ __attribute__((format(printf, 3, 4))) static void jsonize_basic_eventf(struct nD serialize_and_send(reader_thread); } +/* See: https://en.wikipedia.org/wiki/MurmurHash#MurmurHash3 */ +static inline uint32_t murmur_32_scramble(uint32_t k) +{ + k *= 0xcc9e2d51; + k = (k << 15) | (k >> 17); + k *= 0x1b873593; + return k; +} + +/* See: https://en.wikipedia.org/wiki/MurmurHash#MurmurHash3 */ +static uint32_t murmur3_32(uint8_t const * key, size_t len, uint32_t seed) +{ + uint32_t h = seed; + uint32_t k; + /* Read in groups of 4. */ + for (size_t i = len >> 2; i; i--) + { + k = htole32(*(uint32_t *)key); + key += sizeof(uint32_t); + h ^= murmur_32_scramble(k); + h = (h << 13) | (h >> 19); + h = h * 5 + 0xe6546b64; + } + /* Read the rest. */ + k = 0; + for (size_t i = len & 3; i; i--) + { + k <<= 8; + k |= key[i - 1]; + } + // A swap is *not* necessary here because the preceding loop already + // places the low bytes in the low places according to whatever endianness + // we use. Swaps only apply when the memory is copied in a chunk. + h ^= murmur_32_scramble(k); + /* Finalize. */ + h ^= len; + h ^= h >> 16; + h *= 0x85ebca6b; + h ^= h >> 13; + h *= 0xc2b2ae35; + h ^= h >> 16; + return h; +} + +static uint32_t calculate_ndpi_flow_struct_hash(struct ndpi_flow_struct const * const ndpi_flow) +{ + /* + * This is a kludge, but necessary for now as I do not want to spam nDPIsrvd and clients + * with the same detection json string over and over again. + * So we are building a hash over the more "stable" parts of the ndpi flow struct. + * Stable in terms of they should only change if the detection changes for whatever reason. + * At the time of writing, nDPI has no API function to check if the detection changed + * or has some new information available. This is far from perfect. + */ + uint32_t hash = murmur3_32((uint8_t const *)&ndpi_flow->protos, + sizeof(ndpi_flow->protos), + nDPId_FLOW_STRUCT_SEED); + hash += ndpi_flow->category; + hash += ndpi_flow->risk; + + const size_t protocol_bitmask_size = sizeof(ndpi_flow->src->detected_protocol_bitmask.fds_bits) / + sizeof(ndpi_flow->src->detected_protocol_bitmask.fds_bits[0]); + for (size_t i = 0; i < protocol_bitmask_size; ++i) + { + hash += ndpi_flow->src->detected_protocol_bitmask.fds_bits[i]; + hash += ndpi_flow->dst->detected_protocol_bitmask.fds_bits[i]; + } + return hash; +} + static void ndpi_process_packet(uint8_t * const args, struct pcap_pkthdr const * const header, uint8_t const * const packet) @@ -1215,7 +1285,7 @@ static void ndpi_process_packet(uint8_t * const args, uint16_t l4_len = 0; uint16_t type; - int thread_index = nDPId_INITIAL_THREAD_HASH; // generated with `dd if=/dev/random bs=1024 count=1 |& hd' + int thread_index = nDPId_THREAD_DISTRIBUTION_SEED; // generated with `dd if=/dev/random bs=1024 count=1 |& hd' if (reader_thread == NULL) { @@ -1229,8 +1299,7 @@ static void ndpi_process_packet(uint8_t * const args, } workflow->packets_captured++; - time_ms = - ((uint64_t)header->ts.tv_sec) * tick_resolution + header->ts.tv_usec / (1000000 / tick_resolution); + time_ms = ((uint64_t)header->ts.tv_sec) * tick_resolution + header->ts.tv_usec / (1000000 / tick_resolution); workflow->last_time = time_ms; check_for_idle_flows(reader_thread); @@ -1707,11 +1776,16 @@ static void ndpi_process_packet(uint8_t * const args, flow_to_process->detection_completed = 1; workflow->detected_flow_protocols++; jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTED); - } else if (flow_to_process->detection_completed == 1 && - flow_to_process->ndpi_flow->check_extra_packets) + flow_to_process->last_ndpi_flow_struct_hash = calculate_ndpi_flow_struct_hash(flow_to_process->ndpi_flow); + } + else if (flow_to_process->detection_completed == 1) { - /* TODO: Throw only FLOW_EVENT_DETECTED_EXTRA if the JSON string changes. */ - jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTED_EXTRA); + uint32_t hash = calculate_ndpi_flow_struct_hash(flow_to_process->ndpi_flow); + if (hash != flow_to_process->last_ndpi_flow_struct_hash) + { + jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTION_UPDATE); + flow_to_process->last_ndpi_flow_struct_hash = hash; + } } } @@ -2049,8 +2123,10 @@ static int parse_options(int argc, char ** argv) log_to_stderr = 1; if (setvbuf(stderr, NULL, _IOLBF, 0) != 0) { - fprintf(stderr, "%s: Could not set stderr line-buffered, " - "console syslog() messages may appear weird.\n", argv[0]); + fprintf(stderr, + "%s: Could not set stderr line-buffered, " + "console syslog() messages may appear weird.\n", + argv[0]); } break; case 'c': @@ -2080,7 +2156,8 @@ static int parse_options(int argc, char ** argv) { char * endptr; int subopt = getsubopt(&subopts, subopt_token, &value); - if (subopt == -1) { + if (subopt == -1) + { fprintf(stderr, "Invalid subopt: %s\n\n", value); fprintf(stderr, usage, argv[0]); print_subopt_usage(); @@ -2088,14 +2165,17 @@ static int parse_options(int argc, char ** argv) } long int value_llu = strtoull(value, &endptr, 10); - if (value == endptr) { - fprintf(stderr, "Subopt `%s': Value `%s' is not a valid number.\n", - subopt_token[subopt], value); + if (value == endptr) + { + fprintf(stderr, + "Subopt `%s': Value `%s' is not a valid number.\n", + subopt_token[subopt], + value); return 1; } - if (errno == ERANGE) { - fprintf(stderr, "Subopt `%s': Number too large.\n", - subopt_token[subopt]); + if (errno == ERANGE) + { + fprintf(stderr, "Subopt `%s': Number too large.\n", subopt_token[subopt]); return 1; } @@ -2127,10 +2207,8 @@ static int parse_options(int argc, char ** argv) break; } - if (errno == ERANGE) { - } - if (value == endptr) { - } + if (errno == ERANGE) {} + if (value == endptr) {} } break; } @@ -2141,7 +2219,8 @@ static int parse_options(int argc, char ** argv) } } - if (optind < argc) { + if (optind < argc) + { fprintf(stderr, "Unexpected argument after options\n\n"); fprintf(stderr, usage, argv[0]); print_subopt_usage(); @@ -2155,31 +2234,38 @@ static int validate_options(char const * const arg0) { int retval = 0; - if (max_flows_per_thread < 128 || max_flows_per_thread > nDPId_MAX_FLOWS_PER_THREAD) { + if (max_flows_per_thread < 128 || max_flows_per_thread > nDPId_MAX_FLOWS_PER_THREAD) + { fprintf(stderr, "%s: 128 < max-flows-per-thread < %d\n", arg0, nDPId_MAX_FLOWS_PER_THREAD); retval = 1; } - if (max_idle_flows_per_thread < 64 || max_idle_flows_per_thread > nDPId_MAX_IDLE_FLOWS_PER_THREAD) { + if (max_idle_flows_per_thread < 64 || max_idle_flows_per_thread > nDPId_MAX_IDLE_FLOWS_PER_THREAD) + { fprintf(stderr, "%s: 64 < max-idle-flows-per-thread < %d\n", arg0, nDPId_MAX_IDLE_FLOWS_PER_THREAD); retval = 1; } - if (tick_resolution < 100) { + if (tick_resolution < 100) + { fprintf(stderr, "%s: tick-resolution > 100\n", arg0); retval = 1; } - if (reader_thread_count < 1 || reader_thread_count > nDPId_MAX_READER_THREADS) { + if (reader_thread_count < 1 || reader_thread_count > nDPId_MAX_READER_THREADS) + { fprintf(stderr, "%s: 1 < reader-thread-count < %d\n", arg0, nDPId_MAX_READER_THREADS); retval = 1; } - if (idle_scan_period < 1000) { + if (idle_scan_period < 1000) + { fprintf(stderr, "%s: idle-scan-period > 1000\n", arg0); retval = 1; } - if (max_idle_time < 60) { + if (max_idle_time < 60) + { fprintf(stderr, "%s: max-idle-time > 60\n", arg0); retval = 1; } - if (max_post_end_flow_time > max_idle_time) { + if (max_post_end_flow_time > max_idle_time) + { fprintf(stderr, "%s: max-post-end-flow-time < max_idle_time\n", arg0); retval = 1; } @@ -2215,9 +2301,10 @@ int main(int argc, char ** argv) pcap_lib_version() + strlen("libpcap version ")); if (ndpi_get_gcrypt_version() != NULL) { - printf("gcrypt version: %s\n" - "----------------------------------\n", - ndpi_get_gcrypt_version()); + printf( + "gcrypt version: %s\n" + "----------------------------------\n", + ndpi_get_gcrypt_version()); } openlog("nDPId", LOG_CONS | LOG_PERROR, LOG_DAEMON); |