diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2021-12-15 23:25:32 +0100 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2022-01-20 00:50:38 +0100 |
commit | 9e07a57566cc45bf92a845d8cee968d72e0f314e (patch) | |
tree | 8f1a6bfd08bd68a5253fadf3a01beecda77b1c95 /examples | |
parent | a35fc1d5ea8570609cc0c8cf6edadc81f8f5bb76 (diff) |
Major nDPId extension. Sorry for the huge commit.
- nDPId: fixed invalid IP4/IP6 tuple compare
- nDPIsrvd: fixed caching issue (finally)
- added tiny c example (can be used to check flow manager sanity)
- c-captured: use flow_last_seen timestamp from `struct nDPIsrvd_flow`
- README.md update: added example JSON sequence
- nDPId: added new flow event `update` necessary for correct
timeout handling (and other future use-cases)
- nDPIsrvd.h and nDPIsrvd.py: switched to an instance
(consists of an alias/source tuple) based flow manager
- every flow related event **must** now serialize `alias`, `source`,
`flow_id`, `flow_last_seen` and `flow_idle_time` to make the timeout
handling and verification process work correctly
- nDPIsrvd.h: ability to profile any dynamic memory (de-)allocation
- nDPIsrvd.py: removed PcapPacket class (unused)
- py-flow-dashboard and py-flow-multiprocess: fixed race condition
- py-flow-info: print statusbar with probably useful information
- nDPId/nDPIsrvd.h: switched from packet-flow only timestamps (`pkt_*sec`)
to a generic flow event timestamp `ts_msec`
- nDPId-test: added additional checks
- nDPId: increased ICMP flow timeout
- nDPId: using event based i/o if capturing packets from a device
- nDPIsrvd: fixed memory leak on shutdown if remote descriptors
were still connected
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'examples')
-rw-r--r-- | examples/README.md | 8 | ||||
-rw-r--r-- | examples/c-captured/c-captured.c | 67 | ||||
-rw-r--r-- | examples/c-collectd/c-collectd.c | 30 | ||||
-rw-r--r-- | examples/c-simple/c-simple.c | 162 | ||||
-rw-r--r-- | examples/go-dashboard/main.go | 8 | ||||
-rwxr-xr-x | examples/py-flow-dashboard/flow-dash.py | 38 | ||||
-rwxr-xr-x | examples/py-flow-info/flow-info.py | 187 | ||||
-rwxr-xr-x | examples/py-flow-muliprocess/py-flow-multiprocess.py | 37 | ||||
-rwxr-xr-x | examples/py-ja3-checker/py-ja3-checker.py | 4 | ||||
-rwxr-xr-x | examples/py-json-stdout/json-stdout.py | 4 | ||||
-rwxr-xr-x | examples/py-schema-validation/py-schema-validation.py | 4 | ||||
-rwxr-xr-x | examples/py-semantic-validation/py-semantic-validation.py | 237 |
12 files changed, 634 insertions, 152 deletions
diff --git a/examples/README.md b/examples/README.md index 95804aabb..39324fdeb 100644 --- a/examples/README.md +++ b/examples/README.md @@ -17,9 +17,13 @@ A collecd-exec compatible middleware that gathers statistic values from nDPId. Tiny nDPId json dumper. Does not provide any useful funcationality besides dumping parsed JSON objects. -## go-dashboard +## c-simple -A discontinued tty UI nDPId dashboard. I've figured out that Go + UI is a bad idea, in particular if performance is a concern. +Very tiny integration example. + +## go-dashboard (DISCONTINUED!) + +A discontinued tty UI nDPId dashboard. ## py-flow-info diff --git a/examples/c-captured/c-captured.c b/examples/c-captured/c-captured.c index 01d2cd041..3437091f1 100644 --- a/examples/c-captured/c-captured.c +++ b/examples/c-captured/c-captured.c @@ -18,6 +18,7 @@ #include <unistd.h> #include <ndpi_typedefs.h> +#include <ndpi_api.h> #include "nDPIsrvd.h" #include "utarray.h" @@ -41,7 +42,6 @@ struct packet_data struct flow_user_data { - nDPIsrvd_ull flow_last_seen_ts_sec; uint8_t flow_new_seen; uint8_t detection_finished; uint8_t guessed; @@ -71,6 +71,19 @@ static ndpi_risk process_risky = NDPI_NO_RISK; static uint8_t process_midstream = 0; static uint8_t ignore_empty_flows = 0; +#ifdef ENABLE_MEMORY_PROFILING +void nDPIsrvd_memprof_log(char const * const format, ...) +{ + va_list ap; + + va_start(ap, format); + fprintf(stderr, "%s", "nDPIsrvd MemoryProfiler: "); + vfprintf(stderr, format, ap); + fprintf(stderr, "%s\n", ""); + va_end(ap); +} +#endif + static void packet_data_copy(void * dst, const void * src) { struct packet_data * const pd_dst = (struct packet_data *)dst; @@ -324,11 +337,14 @@ static enum nDPIsrvd_conversion_return perror_ull(enum nDPIsrvd_conversion_retur } static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_instance * const instance, struct nDPIsrvd_flow * const flow) { + (void)instance; + if (flow == NULL) { - return CALLBACK_OK; // We do not care for non flow/packet-flow events for NOW. + return CALLBACK_OK; // We do not care for non-flow events for NOW except for packet-flow events. } struct flow_user_data * const flow_user = (struct flow_user_data *)flow->flow_user_data; @@ -354,11 +370,8 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock return CALLBACK_ERROR; } - nDPIsrvd_ull pkt_ts_sec = 0ull; - perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "pkt_ts_sec"), &pkt_ts_sec), "pkt_ts_sec"); - - nDPIsrvd_ull pkt_ts_usec = 0ull; - perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "pkt_ts_usec"), &pkt_ts_usec), "pkt_ts_usec"); + nDPIsrvd_ull ts_msec = 0ull; + perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "ts_msec"), &ts_msec), "ts_msec"); nDPIsrvd_ull pkt_len = 0ull; perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "pkt_len"), &pkt_len), "pkt_len"); @@ -369,8 +382,8 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock nDPIsrvd_ull pkt_l4_offset = 0ull; perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "pkt_l4_offset"), &pkt_l4_offset), "pkt_l4_offset"); - struct packet_data pd = {.packet_ts_sec = pkt_ts_sec, - .packet_ts_usec = pkt_ts_usec, + struct packet_data pd = {.packet_ts_sec = ts_msec / 1000, + .packet_ts_usec = (ts_msec % 1000) * 1000, .packet_len = pkt_len, .base64_packet_size = pkt->value_length, .base64_packet_const = pkt->value}; @@ -385,9 +398,6 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "flow_tot_l4_payload_len"), &flow_user->flow_tot_l4_payload_len), "flow_tot_l4_payload_len"); - - perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "flow_last_seen"), &flow_user->flow_last_seen_ts_sec), - "flow_last_seen"); } if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "new") != 0) @@ -480,12 +490,14 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock return CALLBACK_OK; } -static void nDPIsrvd_write_flow_info_cb(int outfd, struct nDPIsrvd_flow * const flow) +static void nDPIsrvd_write_flow_info_cb(struct nDPIsrvd_flow const * const flow, void * user_data) { - struct flow_user_data * const flow_user = (struct flow_user_data *)flow->flow_user_data; + (void)user_data; + + struct flow_user_data const * const flow_user = (struct flow_user_data const *)flow->flow_user_data; - dprintf(outfd, - "[ptr: " + fprintf(stderr, + "[Flow %4llu][ptr: " #ifdef __LP64__ "0x%016llx" #else @@ -493,13 +505,14 @@ static void nDPIsrvd_write_flow_info_cb(int outfd, struct nDPIsrvd_flow * const #endif "][last-seen: %13llu][new-seen: %u][finished: %u][detected: %u][risky: " "%u][total-L4-payload-length: " - "%4llu][packets-captured: %u]", + "%4llu][packets-captured: %u]\n", + flow->id_as_ull, #ifdef __LP64__ (unsigned long long int)flow, #else (unsigned long int)flow, #endif - flow_user->flow_last_seen_ts_sec, + flow->last_seen, flow_user->flow_new_seen, flow_user->detection_finished, flow_user->detected, @@ -523,7 +536,7 @@ static void nDPIsrvd_write_flow_info_cb(int outfd, struct nDPIsrvd_flow * const #else (unsigned long int)flow, #endif - flow_user->flow_last_seen_ts_sec, + flow->last_seen, flow_user->flow_new_seen, flow_user->detection_finished, flow_user->detected, @@ -536,7 +549,7 @@ static void sighandler(int signum) { if (signum == SIGUSR1) { - nDPIsrvd_write_flow_info(2, sock, nDPIsrvd_write_flow_info_cb); + nDPIsrvd_flow_info(sock, nDPIsrvd_write_flow_info_cb, NULL); } else if (main_thread_shutdown == 0) { @@ -544,9 +557,14 @@ static void sighandler(int signum) } } -static void captured_flow_end_callback(struct nDPIsrvd_socket * const sock, struct nDPIsrvd_flow * const flow) +static void captured_flow_cleanup_callback(struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_instance * const instance, + struct nDPIsrvd_flow * const flow, + enum nDPIsrvd_cleanup_reason reason) { (void)sock; + (void)instance; + (void)reason; #ifdef VERBOSE printf("flow %llu end, remaining flows: %u\n", flow->id_as_ull, sock->flow_table->hh.tbl->num_items); @@ -758,7 +776,8 @@ static int mainloop(void) int main(int argc, char ** argv) { - sock = nDPIsrvd_init(0, sizeof(struct flow_user_data), captured_json_callback, captured_flow_end_callback); + sock = + nDPIsrvd_socket_init(0, sizeof(struct flow_user_data), captured_json_callback, captured_flow_cleanup_callback); if (sock == NULL) { fprintf(stderr, "%s: nDPIsrvd socket memory allocation failed!\n", argv[0]); @@ -777,7 +796,7 @@ int main(int argc, char ** argv) if (connect_ret != CONNECT_OK) { fprintf(stderr, "%s: nDPIsrvd socket connect to %s failed!\n", argv[0], serv_optarg); - nDPIsrvd_free(&sock); + nDPIsrvd_socket_free(&sock); return 1; } @@ -809,7 +828,7 @@ int main(int argc, char ** argv) int retval = mainloop(); - nDPIsrvd_free(&sock); + nDPIsrvd_socket_free(&sock); daemonize_shutdown(pidfile); closelog(); diff --git a/examples/c-collectd/c-collectd.c b/examples/c-collectd/c-collectd.c index d7d0819b4..c16a0847d 100644 --- a/examples/c-collectd/c-collectd.c +++ b/examples/c-collectd/c-collectd.c @@ -23,7 +23,6 @@ syslog(flags, format, __VA_ARGS__); \ } -static struct nDPIsrvd_socket * sock = NULL; static int main_thread_shutdown = 0; static int collectd_timerfd = -1; static pid_t collectd_pid; @@ -97,6 +96,19 @@ static struct uint64_t flow_l4_other_count; } collectd_statistics = {}; +#ifdef ENABLE_MEMORY_PROFILING +void nDPIsrvd_memprof_log(char const * const format, ...) +{ + va_list ap; + + va_start(ap, format); + fprintf(stderr, "%s", "nDPIsrvd MemoryProfiler: "); + vfprintf(stderr, format, ap); + fprintf(stderr, "%s\n", ""); + va_end(ap); +} +#endif + static int set_collectd_timer(void) { const time_t interval = collectd_interval_ull * 1000; @@ -132,7 +144,7 @@ static void sighandler(int signum) } } -static int parse_options(int argc, char ** argv) +static int parse_options(int argc, char ** argv, struct nDPIsrvd_socket * const sock) { int opt; @@ -344,7 +356,7 @@ static void print_collectd_exec_output(void) memset(&collectd_statistics, 0, sizeof(collectd_statistics)); } -static int mainloop(int epollfd) +static int mainloop(int epollfd, struct nDPIsrvd_socket * const sock) { struct epoll_event events[32]; size_t const events_size = sizeof(events) / sizeof(events[0]); @@ -427,9 +439,11 @@ static uint64_t get_total_flow_bytes(struct nDPIsrvd_socket * const sock) } static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_instance * const instance, struct nDPIsrvd_flow * const flow) { (void)sock; + (void)instance; (void)flow; struct nDPIsrvd_json_token const * const flow_event_name = TOKEN_GET_SZ(sock, "flow_event_name"); @@ -668,14 +682,14 @@ int main(int argc, char ** argv) openlog("nDPIsrvd-collectd", LOG_CONS, LOG_DAEMON); - sock = nDPIsrvd_init(0, 0, captured_json_callback, NULL); + struct nDPIsrvd_socket * sock = nDPIsrvd_socket_init(0, 0, captured_json_callback, NULL); if (sock == NULL) { LOG(LOG_DAEMON | LOG_ERR, "%s", "nDPIsrvd socket memory allocation failed!"); return 1; } - if (parse_options(argc, argv) != 0) + if (parse_options(argc, argv, sock) != 0) { return 1; } @@ -696,7 +710,7 @@ int main(int argc, char ** argv) if (connect_ret != CONNECT_OK) { LOG(LOG_DAEMON | LOG_ERR, "nDPIsrvd socket connect to %s failed!", serv_optarg); - nDPIsrvd_free(&sock); + nDPIsrvd_socket_free(&sock); return 1; } @@ -738,9 +752,9 @@ int main(int argc, char ** argv) } LOG(LOG_DAEMON | LOG_NOTICE, "%s", "Initialization succeeded."); - retval = mainloop(epollfd); + retval = mainloop(epollfd, sock); - nDPIsrvd_free(&sock); + nDPIsrvd_socket_free(&sock); close(collectd_timerfd); close(epollfd); closelog(); diff --git a/examples/c-simple/c-simple.c b/examples/c-simple/c-simple.c new file mode 100644 index 000000000..5af146d7b --- /dev/null +++ b/examples/c-simple/c-simple.c @@ -0,0 +1,162 @@ +#include <signal.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> + +#include "nDPIsrvd.h" + +static int main_thread_shutdown = 0; +static struct nDPIsrvd_socket * sock = NULL; + +#ifdef ENABLE_MEMORY_PROFILING +void nDPIsrvd_memprof_log(char const * const format, ...) +{ + va_list ap; + + va_start(ap, format); + fprintf(stderr, "%s", "nDPIsrvd MemoryProfiler: "); + vfprintf(stderr, format, ap); + fprintf(stderr, "%s\n", ""); + va_end(ap); +} +#endif + +static void nDPIsrvd_write_flow_info_cb(struct nDPIsrvd_flow const * const flow, void * user_data) +{ + (void)user_data; + + fprintf(stderr, + "[Flow %4llu][ptr: " +#ifdef __LP64__ + "0x%016llx" +#else + "0x%08lx" +#endif + "][last-seen: %13llu][idle-time: %13llu]\n", + flow->id_as_ull, +#ifdef __LP64__ + (unsigned long long int)flow, +#else + (unsigned long int)flow, +#endif + flow->last_seen, + flow->idle_time); +} + +static void nDPIsrvd_verify_flows_cb(struct nDPIsrvd_flow const * const flow, void * user_data) +{ + (void)user_data; + + fprintf(stderr, "Flow %llu verification failed\n", flow->id_as_ull); +} + +static void sighandler(int signum) +{ + struct nDPIsrvd_instance * current_instance; + struct nDPIsrvd_instance * itmp; + int verification_failed = 0; + + if (signum == SIGUSR1) + { + nDPIsrvd_flow_info(sock, nDPIsrvd_write_flow_info_cb, NULL); + + HASH_ITER(hh, sock->instance_table, current_instance, itmp) + { + if (nDPIsrvd_verify_flows(current_instance, nDPIsrvd_verify_flows_cb, NULL) != 0) + { + fprintf(stderr, "Flow verification failed for instance %d\n", current_instance->alias_source_key); + verification_failed = 1; + } + } + if (verification_failed == 0) + { + fprintf(stderr, "%s\n", "Flow verification succeeded."); + } + } + else if (main_thread_shutdown == 0) + { + main_thread_shutdown = 1; + } +} + +static enum nDPIsrvd_callback_return simple_json_callback(struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_instance * const instance, + struct nDPIsrvd_flow * const flow) +{ + (void)sock; + (void)flow; + + struct nDPIsrvd_json_token const * const flow_event_name = TOKEN_GET_SZ(sock, "flow_event_name"); + if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "new") != 0) + { + printf("Instance %d, Flow %llu new\n", instance->alias_source_key, flow->id_as_ull); + } + + return CALLBACK_OK; +} + +static void simple_flow_cleanup_callback(struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_instance * const instance, + struct nDPIsrvd_flow * const flow, + enum nDPIsrvd_cleanup_reason reason) +{ + (void)sock; + + char const * const reason_str = nDPIsrvd_enum_to_string(reason); + printf("Instance %d, Flow %llu cleanup, reason: %s\n", + instance->alias_source_key, + flow->id_as_ull, + (reason_str != NULL ? reason_str : "UNKNOWN")); + + if (reason == CLEANUP_REASON_FLOW_TIMEOUT) + { + printf("Did an nDPId instance die or was SIGKILL'ed?\n"); + exit(1); + } +} + +int main(int argc, char ** argv) +{ + (void)argc; + (void)argv; + + signal(SIGUSR1, sighandler); + signal(SIGINT, sighandler); + signal(SIGTERM, sighandler); + signal(SIGPIPE, sighandler); + + sock = nDPIsrvd_socket_init(0, 0, simple_json_callback, simple_flow_cleanup_callback); + if (sock == NULL) + { + return 1; + } + + if (nDPIsrvd_setup_address(&sock->address, "127.0.0.1:7000") != 0) + { + return 1; + } + + if (nDPIsrvd_connect(sock) != CONNECT_OK) + { + nDPIsrvd_socket_free(&sock); + return 1; + } + + enum nDPIsrvd_read_return read_ret; + while (main_thread_shutdown == 0 && (read_ret = nDPIsrvd_read(sock)) == READ_OK) + { + enum nDPIsrvd_parse_return parse_ret = nDPIsrvd_parse_all(sock); + if (parse_ret != PARSE_NEED_MORE_DATA) + { + printf("Could not parse json string: %s\n", nDPIsrvd_enum_to_string(parse_ret)); + break; + } + } + + if (read_ret != READ_OK) + { + printf("Parse read %s\n", nDPIsrvd_enum_to_string(read_ret)); + } + + return 1; +} diff --git a/examples/go-dashboard/main.go b/examples/go-dashboard/main.go index 13c5d462b..c4cad3fb1 100644 --- a/examples/go-dashboard/main.go +++ b/examples/go-dashboard/main.go @@ -19,7 +19,7 @@ var ( InfoLogger *log.Logger ErrorLogger *log.Logger - NETWORK_BUFFER_MAX_SIZE uint16 = 12288 + NETWORK_BUFFER_MAX_SIZE uint16 = 13312 NETWORK_BUFFER_LENGTH_DIGITS uint16 = 5 ) @@ -30,11 +30,11 @@ type packet_event struct { FlowID uint32 `json:"flow_id"` FlowPacketID uint64 `json:"flow_packet_id"` + Timestamp uint64 `json:"ts_msec"` + PacketEventID uint8 `json:"packet_event_id"` PacketEventName string `json:"packet_event_name"` PacketOversize bool `json:"pkt_oversize"` - PacketTimestampS uint64 `json:"pkt_ts_sec"` - PacketTimestampUs uint64 `json:"pkt_ts_usec"` PacketLength uint32 `json:"pkt_len"` PacketL4Length uint32 `json:"pkt_l4_len"` Packet string `json:"pkt"` @@ -49,7 +49,7 @@ type flow_event struct { PacketID uint64 `json:"packet_id"` FlowID uint32 `json:"flow_id"` - FlowPacketID uint64 `json:"flow_packet_id"` + FlowPacketID uint64 `json:"flow_packets_processed"` FlowFirstSeen uint64 `json:"flow_first_seen"` FlowLastSeen uint64 `json:"flow_last_seen"` FlowTotalLayer4DataLength uint64 `json:"flow_tot_l4_data_len"` diff --git a/examples/py-flow-dashboard/flow-dash.py b/examples/py-flow-dashboard/flow-dash.py index 2bf95af42..8e49ed020 100755 --- a/examples/py-flow-dashboard/flow-dash.py +++ b/examples/py-flow-dashboard/flow-dash.py @@ -88,25 +88,26 @@ def update_pie(n): values = [0, 0, 0, 0, 0] for flow_id in shared_flow_dict.keys(): + try: + flow = shared_flow_dict[flow_id] + except KeyError: + continue - if shared_flow_dict[flow_id]['is_risky'] is True: + if flow['is_risky'] is True: values[0] += 1 - if shared_flow_dict[flow_id]['is_midstream'] is True: + if flow['is_midstream'] is True: values[1] += 1 - if shared_flow_dict[flow_id]['is_detected'] is True: + if flow['is_detected'] is True: values[2] += 1 - if shared_flow_dict[flow_id]['is_guessed'] is True: + if flow['is_guessed'] is True: values[3] += 1 - if shared_flow_dict[flow_id]['is_not_detected'] is True: + if flow['is_not_detected'] is True: values[4] += 1 - if shared_flow_dict[flow_id]['remove_me'] is True: - del shared_flow_dict[flow_id] - # print(values) return { @@ -121,8 +122,13 @@ def web_worker(): app.run_server() -def nDPIsrvd_worker_onJsonLineRecvd(json_dict, current_flow, global_user_data): - if 'flow_event_name' not in json_dict: +def nDPIsrvd_worker_onFlowCleanup(instance, current_flow, global_user_data): + del shared_flow_dict[current_flow.flow_id] + + return True + +def nDPIsrvd_worker_onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): + if 'flow_id' not in json_dict: return True # print(json_dict) @@ -134,7 +140,9 @@ def nDPIsrvd_worker_onJsonLineRecvd(json_dict, current_flow, global_user_data): shared_flow_dict[json_dict['flow_id']]['is_not_detected'] = False shared_flow_dict[json_dict['flow_id']]['is_midstream'] = False shared_flow_dict[json_dict['flow_id']]['is_risky'] = False - shared_flow_dict[json_dict['flow_id']]['remove_me'] = False + + if 'flow_event_name' not in json_dict: + return True if json_dict['flow_event_name'] == 'new': if 'midstream' in json_dict and json_dict['midstream'] != 0: @@ -146,12 +154,8 @@ def nDPIsrvd_worker_onJsonLineRecvd(json_dict, current_flow, global_user_data): elif json_dict['flow_event_name'] == 'detected': shared_flow_dict[json_dict['flow_id']]['is_detected'] = True shared_flow_dict[json_dict['flow_id']]['is_guessed'] = False - shared_flow_dict[json_dict['flow_id']]['is_not_detected'] = False if 'ndpi' in json_dict and 'flow_risk' in json_dict['ndpi']: shared_flow_dict[json_dict['flow_id']]['is_risky'] = True - elif json_dict['flow_event_name'] == 'idle' or \ - json_dict['flow_event_name'] == 'end': - shared_flow_dict[json_dict['flow_id']]['remove_me'] = True return True @@ -165,7 +169,9 @@ def nDPIsrvd_worker(address, nDPIsrvd_global_user_data): nsock = nDPIsrvdSocket() nsock.connect(address) - nsock.loop(nDPIsrvd_worker_onJsonLineRecvd, nDPIsrvd_global_user_data) + nsock.loop(nDPIsrvd_worker_onJsonLineRecvd, + nDPIsrvd_worker_onFlowCleanup, + nDPIsrvd_global_user_data) if __name__ == '__main__': diff --git a/examples/py-flow-info/flow-info.py b/examples/py-flow-info/flow-info.py index 9dfa40957..1f25cea55 100755 --- a/examples/py-flow-info/flow-info.py +++ b/examples/py-flow-info/flow-info.py @@ -1,7 +1,9 @@ #!/usr/bin/env python3 import os +import math import sys +import time sys.path.append(os.path.dirname(sys.argv[0]) + '/../share/nDPId') sys.path.append(os.path.dirname(sys.argv[0]) + '/../usr/share/nDPId') @@ -16,6 +18,153 @@ except ImportError: global args global whois_db +def set_attr_from_dict(some_object, some_dict, key_and_attr_name, default_value): + try: + setattr(some_object, key_and_attr_name, some_dict[key_and_attr_name]) + except KeyError: + if default_value is not None and getattr(some_object, key_and_attr_name, None) is None: + setattr(some_object, key_and_attr_name, default_value) + +def set_attr_if_not_set(some_object, attr_name, value): + try: + getattr(some_object, attr_name) + except AttributeError: + setattr(some_object, attr_name, value) + +class Stats: + last_status_length = 0 + avg_xfer_json_bytes = 0.0 + expired_tot_l4_payload_len = 0 + expired_avg_l4_payload_len = 0 + total_flows = 0 + risky_flows = 0 + midstream_flows = 0 + guessed_flows = 0 + not_detected_flows = 0 + start_time = 0.0 + current_time = 0.0 + json_lines = 0 + spinner_state = 0 + + def __init__(self, nDPIsrvd_sock): + self.start_time = time.time() + self.nsock = nDPIsrvd_sock + + def updateSpinner(self): + if self.current_time + 0.25 <= time.time(): + self.spinner_state += 1 + + def getSpinner(self): + spinner_states = ['-', '\\', '|', '/'] + return spinner_states[self.spinner_state % len(spinner_states)] + + def getDataFromJson(self, json_dict, current_flow): + if current_flow is None: + return + + set_attr_from_dict(current_flow, json_dict, 'flow_tot_l4_payload_len', 0) + set_attr_from_dict(current_flow, json_dict, 'flow_avg_l4_payload_len', 0) + if 'ndpi' in json_dict: + set_attr_from_dict(current_flow, json_dict['ndpi'], 'flow_risk', {}) + else: + set_attr_from_dict(current_flow, {}, 'flow_risk', {}) + set_attr_from_dict(current_flow, json_dict, 'midstream', 0) + set_attr_from_dict(current_flow, json_dict, 'flow_event_name', '') + set_attr_if_not_set(current_flow, 'guessed', False) + set_attr_if_not_set(current_flow, 'not_detected', False) + if current_flow.flow_event_name == 'guessed': + current_flow.guessed = True + elif current_flow.flow_event_name == 'not-detected': + current_flow.not_detected = True + + def update(self, json_dict, current_flow): + self.updateSpinner() + self.json_lines += 1 + self.current_time = time.time() + self.avg_xfer_json_bytes = self.nsock.received_bytes / (self.current_time - self.start_time) + self.getDataFromJson(json_dict, current_flow) + + def updateOnCleanup(self, current_flow): + self.total_flows += 1 + self.expired_tot_l4_payload_len += current_flow.flow_tot_l4_payload_len + self.expired_avg_l4_payload_len += current_flow.flow_avg_l4_payload_len + self.risky_flows += 1 if len(current_flow.flow_risk) > 0 else 0 + self.midstream_flows += 1 if current_flow.midstream != 0 else 0 + self.guessed_flows += 1 if current_flow.guessed is True else 0 + self.not_detected_flows += 1 if current_flow.not_detected is True else 0 + + def getStatsFromFlowMgr(self): + alias_count = 0 + source_count = 0 + flow_count = 0 + flow_tot_l4_payload_len = 0.0 + flow_avg_l4_payload_len = 0.0 + risky = 0 + midstream = 0 + guessed = 0 + not_detected = 0 + + instances = self.nsock.flow_mgr.instances + for alias in instances: + alias_count += 1 + for source in instances[alias]: + source_count += 1 + for flow_id in instances[alias][source].flows: + flow_count += 1 + current_flow = instances[alias][source].flows[flow_id] + + flow_tot_l4_payload_len += current_flow.flow_tot_l4_payload_len + flow_avg_l4_payload_len += current_flow.flow_avg_l4_payload_len + risky += 1 if len(current_flow.flow_risk) > 0 else 0 + midstream += 1 if current_flow.midstream != 0 else 0 + guessed += 1 if current_flow.guessed is True else 0 + not_detected = 1 if current_flow.not_detected is True else 0 + + return alias_count, source_count, flow_count, \ + flow_tot_l4_payload_len, flow_avg_l4_payload_len, \ + risky, midstream, guessed, not_detected + + @staticmethod + def prettifyBytes(bytes_received): + size_names = ['B', 'KB', 'MB', 'GB', 'TB'] + if bytes_received == 0: + i = 0 + else: + i = min(int(math.floor(math.log(bytes_received, 1024))), len(size_names) - 1) + p = math.pow(1024, i) + s = round(bytes_received / p, 2) + return '{:.2f} {}'.format(s, size_names[i]) + + def resetStatus(self): + sys.stdout.write('\r' + str(' ' * self.last_status_length) + '\r') + sys.stdout.flush() + + def printStatus(self): + alias_count, source_count, flow_count, \ + tot_l4_payload_len, avg_l4_payload_len, \ + risky, midstream, guessed, not_detected = self.getStatsFromFlowMgr() + + out_str = '\r[n|tot|avg JSONs: {}|{}|{}/s] [tot|avg l4: {}|{}] ' \ + '[lss|srcs: {}|{}] ' \ + '[flws|rsky|mdstrm|!dtctd|gssd: {}|{}|{}|{}|{} / {}|{}|{}|{}|{}] [{}]' \ + ''.format(self.json_lines, + Stats.prettifyBytes(self.nsock.received_bytes), + Stats.prettifyBytes(self.avg_xfer_json_bytes), + Stats.prettifyBytes(tot_l4_payload_len + self.expired_tot_l4_payload_len), + Stats.prettifyBytes(avg_l4_payload_len + self.expired_avg_l4_payload_len), + alias_count, source_count, + flow_count, risky, midstream, not_detected, guessed, + flow_count + self.total_flows, + risky + self.risky_flows, + midstream + self.midstream_flows, + not_detected + self.not_detected_flows, + guessed + self.guessed_flows, + self.getSpinner()) + self.last_status_length = len(out_str) - 1 # '\r' + + sys.stdout.write(out_str) + sys.stdout.flush() + def prettifyEvent(color_list, whitespaces, text): term_attrs = str() for color in color_list: @@ -60,18 +209,35 @@ def whois(ip_str): return None return whois_db[ip_str] -def onJsonLineRecvd(json_dict, current_flow, global_user_data): +def onFlowCleanup(instance, current_flow, global_user_data): + stats = global_user_data + stats.updateOnCleanup(current_flow) + + return True + +def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): + stats = global_user_data + stats.update(json_dict, current_flow) + stats.resetStatus() + instance_and_source = '' - instance_and_source += '[{}]'.format(TermColor.setColorByString(json_dict['alias'])) - instance_and_source += '[{}]'.format(TermColor.setColorByString(json_dict['source'])) + instance_and_source += '[{}]'.format(TermColor.setColorByString(instance.alias)) + instance_and_source += '[{}]'.format(TermColor.setColorByString(instance.source)) + if 'daemon_event_id' in json_dict: + print('{} {}: {}'.format(instance_and_source, prettifyEvent([TermColor.WARNING, TermColor.BLINK], 16, 'DAEMON-EVENT'), json_dict['daemon_event_name'])) + stats.printStatus() + return True if 'basic_event_id' in json_dict: - print('{} {}: {}'.format(instance_and_source, prettifyEvent([TermColor.WARNING, TermColor.BLINK], 16, 'BASIC-EVENT'), json_dict['basic_event_name'])) + print('{} {}: {}'.format(instance_and_source, prettifyEvent([TermColor.FAIL, TermColor.BLINK], 16, 'BASIC-EVENT'), json_dict['basic_event_name'])) + stats.printStatus() return True elif 'flow_event_id' not in json_dict: + stats.printStatus() return True if checkEventFilter(json_dict) is False: + stats.printStatus() return True ndpi_proto_categ_breed = '' @@ -99,8 +265,11 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data): line_suffix = '' flow_event_name = '' - if json_dict['flow_event_name'] == 'guessed' or json_dict['flow_event_name'] == 'not-detected': + if json_dict['flow_event_name'] == 'guessed': flow_event_name += '{}{:>16}{}'.format(TermColor.HINT, json_dict['flow_event_name'], TermColor.END) + elif json_dict['flow_event_name'] == 'not-detected': + flow_event_name += '{}{:>16}{}'.format(TermColor.WARNING + TermColor.BOLD + TermColor.BLINK, + json_dict['flow_event_name'], TermColor.END) else: if json_dict['flow_event_name'] == 'new': line_suffix = '' @@ -145,6 +314,8 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data): if len(ndpi_frisk) > 0: print('{} {:>18}{}'.format(instance_and_source, '', ndpi_frisk)) + stats.printStatus() + return True if __name__ == '__main__': @@ -169,4 +340,8 @@ if __name__ == '__main__': nsock = nDPIsrvdSocket() nsock.connect(address) - nsock.loop(onJsonLineRecvd, None) + stats = Stats(nsock) + try: + nsock.loop(onJsonLineRecvd, onFlowCleanup, stats) + except KeyboardInterrupt: + print('\n\nKeyboard Interrupt: cleaned up {} flows.'.format(len(nsock.shutdown()))) diff --git a/examples/py-flow-muliprocess/py-flow-multiprocess.py b/examples/py-flow-muliprocess/py-flow-multiprocess.py index 91bc693bc..b90ab536d 100755 --- a/examples/py-flow-muliprocess/py-flow-multiprocess.py +++ b/examples/py-flow-muliprocess/py-flow-multiprocess.py @@ -19,28 +19,41 @@ def mp_worker(unused, shared_flow_dict): import time while True: s = str() + n = int() + for key in shared_flow_dict.keys(): - s += '{}, '.format(str(key)) + try: + flow = shared_flow_dict[key] + except KeyError: + continue + + s += '{}, '.format(str(flow.flow_id)) + n += 1 + if len(s) == 0: s = '-' else: s = s[:-2] - print('Flows: {}'.format(s)) + + print('Flows({}): {}'.format(n, s)) time.sleep(1) -def nDPIsrvd_worker_onJsonLineRecvd(json_dict, current_flow, global_user_data): +def nDPIsrvd_worker_onFlowCleanup(instance, current_flow, global_user_data): + shared_flow_dict = global_user_data + + del shared_flow_dict[current_flow.flow_id] + + return True + + +def nDPIsrvd_worker_onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): shared_flow_dict = global_user_data - if 'flow_event_name' not in json_dict: + if 'flow_id' not in json_dict: return True - if json_dict['flow_event_name'] == 'new': - shared_flow_dict[json_dict['flow_id']] = current_flow - elif json_dict['flow_event_name'] == 'idle' or \ - json_dict['flow_event_name'] == 'end': - if json_dict['flow_id'] in shared_flow_dict: - del shared_flow_dict[json_dict['flow_id']] + shared_flow_dict[current_flow.flow_id] = current_flow return True @@ -54,7 +67,9 @@ def nDPIsrvd_worker(address, shared_flow_dict): nsock = nDPIsrvdSocket() nsock.connect(address) - nsock.loop(nDPIsrvd_worker_onJsonLineRecvd, shared_flow_dict) + nsock.loop(nDPIsrvd_worker_onJsonLineRecvd, + nDPIsrvd_worker_onFlowCleanup, + shared_flow_dict) if __name__ == '__main__': diff --git a/examples/py-ja3-checker/py-ja3-checker.py b/examples/py-ja3-checker/py-ja3-checker.py index 3e7e9418f..b7f9df5b1 100755 --- a/examples/py-ja3-checker/py-ja3-checker.py +++ b/examples/py-ja3-checker/py-ja3-checker.py @@ -105,7 +105,7 @@ def getInfoFromJA3ER(ja3_hash): print('No fingerprint for JA3 {} found.'.format(ja3_hash)) -def onJsonLineRecvd(json_dict, current_flow, global_user_data): +def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): if 'tls' in json_dict and 'ja3' in json_dict['tls']: if json_dict['tls']['client_requested_server_name'] == 'ja3er.com': @@ -139,7 +139,7 @@ if __name__ == '__main__': nsock = nDPIsrvdSocket() nsock.connect(address) try: - nsock.loop(onJsonLineRecvd, None) + nsock.loop(onJsonLineRecvd, None, None) except nDPIsrvd.SocketConnectionBroken as err: sys.stderr.write('\n{}\n'.format(err)) except KeyboardInterrupt: diff --git a/examples/py-json-stdout/json-stdout.py b/examples/py-json-stdout/json-stdout.py index 9693ecef6..160a61e0c 100755 --- a/examples/py-json-stdout/json-stdout.py +++ b/examples/py-json-stdout/json-stdout.py @@ -13,7 +13,7 @@ except ImportError: import nDPIsrvd from nDPIsrvd import nDPIsrvdSocket, TermColor -def onJsonLineRecvd(json_dict, current_flow, global_user_data): +def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): print(json_dict) return True @@ -27,4 +27,4 @@ if __name__ == '__main__': nsock = nDPIsrvdSocket() nsock.connect(address) - nsock.loop(onJsonLineRecvd, None) + nsock.loop(onJsonLineRecvd, None, None) diff --git a/examples/py-schema-validation/py-schema-validation.py b/examples/py-schema-validation/py-schema-validation.py index 6273a5aa5..6a07681b3 100755 --- a/examples/py-schema-validation/py-schema-validation.py +++ b/examples/py-schema-validation/py-schema-validation.py @@ -18,7 +18,7 @@ class Stats: print_dot_every = 10 print_nmb_every = print_dot_every * 5 -def onJsonLineRecvd(json_dict, current_flow, global_user_data): +def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): validation_done = nDPIsrvd.validateAgainstSchema(json_dict) global_user_data.lines_processed += 1 @@ -45,7 +45,7 @@ if __name__ == '__main__': nsock = nDPIsrvdSocket() nsock.connect(address) try: - nsock.loop(onJsonLineRecvd, Stats()) + nsock.loop(onJsonLineRecvd, None, Stats()) except nDPIsrvd.SocketConnectionBroken as err: sys.stderr.write('\n{}\n'.format(err)) except KeyboardInterrupt: diff --git a/examples/py-semantic-validation/py-semantic-validation.py b/examples/py-semantic-validation/py-semantic-validation.py index 4aebb8e09..109a968b3 100755 --- a/examples/py-semantic-validation/py-semantic-validation.py +++ b/examples/py-semantic-validation/py-semantic-validation.py @@ -17,14 +17,18 @@ except ImportError: class Stats: event_counter = dict() - lowest_flow_id_for_new_flow = 0 lines_processed = 0 print_dot_every = 10 print_nmb_every = print_dot_every * 5 + def __init__(self, nDPIsrvd_sock): + self.resetEventCounter() + self.nsock = nDPIsrvd_sock + def resetEventCounter(self): keys = ['init','reconnect','shutdown', \ - 'new','end','idle','guessed','detected','detection-update','not-detected', \ + 'new','end','idle','update', + 'guessed','detected','detection-update','not-detected', \ 'packet', 'packet-flow'] for k in keys: self.event_counter[k] = 0 @@ -55,7 +59,7 @@ class Stats: def getEventCounterStr(self): keys = [ [ 'init','reconnect','shutdown' ], \ - [ 'new','end','idle' ], \ + [ 'new','end','idle','update' ], \ [ 'guessed','detected','detection-update','not-detected' ], \ [ 'packet', 'packet-flow' ] ] retval = str() @@ -64,12 +68,8 @@ class Stats: for k in klist: retval += '| {:<16}: {:<4} '.format(k, self.event_counter[k]) retval += '\n--' + '-' * 98 + '\n' - retval += 'Lowest possible flow id (for new flows): {}\n'.format(self.lowest_flow_id_for_new_flow) return retval - def __init__(self): - self.resetEventCounter() - class SemanticValidationException(Exception): def __init__(self, current_flow, text): self.text = text @@ -80,72 +80,139 @@ class SemanticValidationException(Exception): else: return 'Flow ID {}: {}'.format(self.current_flow.flow_id, self.text) -def onJsonLineRecvd(json_dict, current_flow, global_user_data): - stats = global_user_data +def onFlowCleanup(instance, current_flow, global_user_data): + _, enable_timeout_check, _ = global_user_data + + if type(instance) is not nDPIsrvd.Instance: + raise SemanticValidationException(current_flow, + 'instance is not of type nDPIsrvd.Instance: ' \ + '{}'.format(type(instance))) + if type(current_flow) is not nDPIsrvd.Flow: + raise SemanticValidationException(current_flow, + 'current_flow is not of type nDPIsrvd.Flow: ' \ + '{}'.format(type(current_flow))) + if type(global_user_data) is not tuple: + raise SemanticValidationException(current_flow, + 'global_user_data is not of type tuple: ' \ + '{}'.format(type(global_user_data))) + + if current_flow.cleanup_reason == nDPIsrvd.FlowManager.CLEANUP_REASON_INVALID: + raise SemanticValidationException(current_flow, + 'Invalid flow cleanup reason') + + if current_flow.cleanup_reason == nDPIsrvd.FlowManager.CLEANUP_REASON_FLOW_TIMEOUT: + raise SemanticValidationException(current_flow, + 'Unexpected flow cleanup reason: CLEANUP_REASON_FLOW_TIMEOUT') + + if enable_timeout_check is True: + try: + l4_proto = current_flow.l4_proto + except AttributeError: + l4_proto = 'n/a' + + invalid_flows = stats.nsock.verify() + if len(invalid_flows) > 0: + invalid_flows_str = '' + for flow_id in invalid_flows: + flow = instance.flows[flow_id] + try: + l4_proto = flow.l4_proto + except AttributeError: + l4_proto = 'n/a' + invalid_flows_str += '{} proto[{},{}] ts[{} + {} < {}] diff[{}], '.format(flow_id, l4_proto, flow.flow_idle_time, + flow.flow_last_seen, flow.flow_idle_time, + instance.most_recent_flow_time, + instance.most_recent_flow_time - + (flow.flow_last_seen + flow.flow_idle_time)) + + raise SemanticValidationException(None, 'Flow Manager verification failed for: {}'.format(invalid_flows_str[:-2])) + + return True + +class ThreadData(object): + lowest_possible_flow_id = 0 + lowest_possible_packet_id = 0 + +def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): + _, _, stats = global_user_data stats.incrementEventCounter(json_dict) - # dictionary unique for every flow, useful for flow specific semantic validation + if type(instance) is not nDPIsrvd.Instance: + raise SemanticValidationException(current_flow, + 'instance is not of type nDPIsrvd.Instance: ' \ + '{}'.format(type(instance))) + if type(current_flow) is not nDPIsrvd.Flow and current_flow is not None: + raise SemanticValidationException(current_flow, + 'current_flow is not of type nDPIsrvd.Flow: ' \ + '{}'.format(type(current_flow))) + if type(global_user_data) is not tuple: + raise SemanticValidationException(current_flow, + 'global_user_data is not of type tuple: ' \ + '{}'.format(type(global_user_data))) + if type(stats) is not Stats: + raise SemanticValidationException(current_flow, + 'stats is not of type Stats: ' \ + '{}'.format(type(stats))) + try: - semdict = current_flow.semdict + thread_data_dict = instance.thread_data except AttributeError: - try: - semdict = current_flow.semdict = dict() - except AttributeError: - semdict = dict() + thread_data_dict = instance.thread_data = dict() - if 'current_flow' in semdict: - if semdict['current_flow'] != current_flow: - raise SemanticValidationException(current_flow, - 'Semantic dictionary flow reference != current flow reference: ' \ - '{} != {}'.format(semdict['current_flow'], current_flow)) + if json_dict['thread_id'] in thread_data_dict: + td = thread_data_dict[json_dict['thread_id']] else: - semdict['current_flow'] = current_flow + td = thread_data_dict[json_dict['thread_id']] = ThreadData() + + lowest_possible_flow_id = td.lowest_possible_flow_id + lowest_possible_packet_id = td.lowest_possible_packet_id if current_flow is not None: - if 'pkt_ts_sec' in json_dict: - current_flow.last_pkt_seen = int(json_dict['pkt_ts_sec'] * 1000.0) - if 'pkt_ts_usec' in json_dict: - current_flow.last_pkt_seen += int(json_dict['pkt_ts_usec'] / 1000.0) - else: - raise SemanticValidationException(current_flow, - 'Got pkt_ts_sec but no pkt_ts_usec for packet id ' \ - '{}'.format(json_dict['packet_id'])) - if 'flow_id' in semdict: - semdict_thread_key = 'thread' + str(json_dict['thread_id']) - if semdict_thread_key in semdict: - if semdict[semdict_thread_key]['lowest_packet_id'] > json_dict['packet_id']: - raise SemanticValidationException(current_flow, - 'Invalid packet id for thread {} received: ' \ - 'expected packet id lesser or equal {}, ' \ - 'got {}'.format(json_dict['thread_id'], - semdict[semdict_thread_key]['lowest_packet_id'], - json_dict['packet_id'])) - else: - semdict[semdict_thread_key] = dict() - semdict[semdict_thread_key]['lowest_packet_id'] = json_dict['packet_id'] - - if semdict['flow_id'] != current_flow.flow_id or \ - semdict['flow_id'] != json_dict['flow_id']: - raise SemanticValidationException(current_flow, - 'Semantic dictionary flow id != current flow id != JSON dictionary flow id: ' \ - '{} != {} != {}'.format(semdict['flow_id'], \ - current_flow.flow_id, json_dict['flow_id'])) - else: - if json_dict['flow_id'] != current_flow.flow_id: - raise SemanticValidationException(current_flow, - 'JSON dictionary flow id != current flow id: ' \ - '{} != {}'.format(json_dict['flow_id'], current_flow.flow_id)) - semdict['flow_id'] = json_dict['flow_id'] + if instance.flows[current_flow.flow_id] != current_flow: + raise SemanticValidationException(current_flow, + 'FlowManager flow reference != current flow reference: ' \ + '{} != {}'.format(instance.flows[current_flow.flow_id], current_flow)) + + if 'l4_proto' in json_dict: + try: + l4_proto = current_flow.l4_proto + except AttributeError: + l4_proto = current_flow.l4_proto = json_dict['l4_proto'] + + if l4_proto != json_dict['l4_proto']: + raise SemanticValidationException(current_flow, 'Layer4 protocol mismatch: {} != {}'.format(l4_proto, json_dict['l4_proto'])) + elif json_dict['packet_event_name'] != 'packet-flow': + raise SemanticValidationException(current_flow, 'Layer4 protocol not found in JSON') + + if 'flow_last_seen' in json_dict: + if json_dict['flow_last_seen'] != current_flow.flow_last_seen: + raise SemanticValidationException(current_flow, 'Flow last seen: {} != {}'.format(json_dict['flow_last_seen'], + current_flow.flow_last_seen)) + + if 'flow_idle_time' in json_dict: + if json_dict['flow_idle_time'] != current_flow.flow_idle_time: + raise SemanticValidationException(current_flow, 'Flow idle time mismatch: {} != {}'.format(json_dict['flow_idle_time'], + current_flow.flow_idle_time)) + + if ('flow_last_seen' in json_dict and 'flow_idle_time' not in json_dict) or \ + ('flow_last_seen' not in json_dict and 'flow_idle_time' in json_dict): + raise SemanticValidationException(current_flow, + 'Got a JSON string with only one of both keys, ' \ + 'both required for timeout handling:' \ + 'flow_last_seen, flow_idle_time') + + if 'ts_msec' in json_dict: + current_flow.ts_msec = int(json_dict['ts_msec']) if 'flow_packet_id' in json_dict: try: - if json_dict['flow_packet_id'] != current_flow.low_packet_id + 1: + if json_dict['flow_packet_id'] != current_flow.flow_packet_id + 1: raise SemanticValidationException(current_flow, 'Invalid flow_packet_id seen, expected {}, got ' \ - '{}'.format(current_flow.low_packet_id + 1, json_dict['flow_packet_id'])) + '{}'.format(current_flow.flow_packet_id + 1, json_dict['flow_packet_id'])) else: - current_flow.low_packet_id += 1 + current_flow.flow_packet_id += 1 except AttributeError: pass @@ -156,14 +223,32 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data): except AttributeError: pass + if 'packet_event_name' in json_dict: + if json_dict['packet_event_name'] == 'packet-flow': + if lowest_possible_packet_id > json_dict['packet_id']: + raise SemanticValidationException(current_flow, + 'Invalid packet id for thread {} received: ' \ + 'expected packet id lesser or equal {}, ' \ + 'got {}'.format(json_dict['thread_id'], + lowest_possible_packet_id, + json_dict['packet_id'])) + td.lowest_possible_packet_id = lowest_possible_packet_id + + if 'flow_id' in json_dict: + if current_flow.flow_id != json_dict['flow_id']: + raise SemanticValidationException(current_flow, + 'Current flow id != JSON dictionary flow id: ' \ + '{} != {}'.format(current_flow.flow_id, json_dict['flow_id'])) + if 'flow_event_name' in json_dict: try: - if json_dict['flow_first_seen'] > current_flow.last_pkt_seen or \ - json_dict['flow_last_seen'] < current_flow.last_pkt_seen: + if json_dict['flow_first_seen'] > current_flow.ts_msec or \ + json_dict['flow_last_seen'] > current_flow.ts_msec or \ + json_dict['flow_first_seen'] > json_dict['flow_last_seen']: raise SemanticValidationException(current_flow, 'Last packet timestamp is invalid: ' \ - 'first_seen({}) <= {} <= last_seen({})'.format(json_dict['flow_first_seen'], - current_flow.last_pkt_seen, + 'first_seen({}) <= {} >= last_seen({})'.format(json_dict['flow_first_seen'], + current_flow.ts_msec, json_dict['flow_last_seen'])) except AttributeError: if json_dict['flow_event_name'] == 'new': @@ -173,10 +258,10 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data): json_dict['flow_event_name'] == 'idle': current_flow.flow_ended = True elif json_dict['flow_event_name'] == 'new': - if stats.lowest_flow_id_for_new_flow > current_flow.flow_id: + if lowest_possible_flow_id > current_flow.flow_id: raise SemanticValidationException(current_flow, 'JSON dictionary lowest flow id for new flow > current flow id: ' \ - '{} != {}'.format(stats.lowest_flow_id_for_new_flow, current_flow.flow_id)) + '{} != {}'.format(lowest_possible_flow_id, current_flow.flow_id)) try: if current_flow.flow_new_seen == True: raise SemanticValidationException(current_flow, @@ -185,8 +270,8 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data): pass current_flow.flow_new_seen = True current_flow.flow_packet_id = 0 - if stats.lowest_flow_id_for_new_flow == 0: - stats.lowest_flow_id_for_new_flow = current_flow.flow_id + if lowest_possible_flow_id == 0: + td.lowest_possible_flow_id = current_flow.flow_id elif json_dict['flow_event_name'] == 'detected' or \ json_dict['flow_event_name'] == 'not-detected': try: @@ -198,19 +283,19 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data): current_flow.flow_detection_finished = True try: - if current_flow.flow_new_seen is True and stats.lowest_flow_id_for_new_flow > current_flow.flow_id: + if current_flow.flow_new_seen is True and lowest_possible_flow_id > current_flow.flow_id: raise SemanticValidationException(current_flow, 'Lowest flow id for flow > current flow id: ' \ - '{} > {}'.format(stats.lowest_flow_id_for_new_flow, current_flow.flow_id)) + '{} > {}'.format(lowest_possible_flow_id, current_flow.flow_id)) except AttributeError: pass - global_user_data.lines_processed += 1 - if global_user_data.lines_processed % global_user_data.print_dot_every == 0: + stats.lines_processed += 1 + if stats.lines_processed % stats.print_dot_every == 0: sys.stdout.write('.') sys.stdout.flush() - print_nmb_every = global_user_data.print_nmb_every + (len(str(global_user_data.lines_processed)) * global_user_data.print_dot_every) - if global_user_data.lines_processed % print_nmb_every == 0: - sys.stdout.write(str(global_user_data.lines_processed)) + print_nmb_every = stats.print_nmb_every + (len(str(stats.lines_processed)) * stats.print_dot_every) + if stats.lines_processed % print_nmb_every == 0: + sys.stdout.write(str(stats.lines_processed)) sys.stdout.flush() return True @@ -218,6 +303,8 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data): if __name__ == '__main__': argparser = nDPIsrvd.defaultArgumentParser() argparser.add_argument('--strict', action='store_true', default=False, help='Require and validate a full nDPId application lifecycle.') + argparser.add_argument('--enable-timeout-check', action='store_true', default=False, + help='Enable additional flow timeout validation. See README.md for more information') args = argparser.parse_args() address = nDPIsrvd.validateAddress(args) @@ -226,9 +313,9 @@ if __name__ == '__main__': nsock = nDPIsrvdSocket() nsock.connect(address) - stats = Stats() + stats = Stats(nsock) try: - nsock.loop(onJsonLineRecvd, stats) + nsock.loop(onJsonLineRecvd, onFlowCleanup, (args.strict, args.enable_timeout_check, stats)) except nDPIsrvd.SocketConnectionBroken as err: sys.stderr.write('\n{}\n'.format(err)) except KeyboardInterrupt: |