aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2021-12-15 23:25:32 +0100
committerToni Uhlig <matzeton@googlemail.com>2022-01-20 00:50:38 +0100
commit9e07a57566cc45bf92a845d8cee968d72e0f314e (patch)
tree8f1a6bfd08bd68a5253fadf3a01beecda77b1c95 /examples
parenta35fc1d5ea8570609cc0c8cf6edadc81f8f5bb76 (diff)
Major nDPId extension. Sorry for the huge commit.
- nDPId: fixed invalid IP4/IP6 tuple compare - nDPIsrvd: fixed caching issue (finally) - added tiny c example (can be used to check flow manager sanity) - c-captured: use flow_last_seen timestamp from `struct nDPIsrvd_flow` - README.md update: added example JSON sequence - nDPId: added new flow event `update` necessary for correct timeout handling (and other future use-cases) - nDPIsrvd.h and nDPIsrvd.py: switched to an instance (consists of an alias/source tuple) based flow manager - every flow related event **must** now serialize `alias`, `source`, `flow_id`, `flow_last_seen` and `flow_idle_time` to make the timeout handling and verification process work correctly - nDPIsrvd.h: ability to profile any dynamic memory (de-)allocation - nDPIsrvd.py: removed PcapPacket class (unused) - py-flow-dashboard and py-flow-multiprocess: fixed race condition - py-flow-info: print statusbar with probably useful information - nDPId/nDPIsrvd.h: switched from packet-flow only timestamps (`pkt_*sec`) to a generic flow event timestamp `ts_msec` - nDPId-test: added additional checks - nDPId: increased ICMP flow timeout - nDPId: using event based i/o if capturing packets from a device - nDPIsrvd: fixed memory leak on shutdown if remote descriptors were still connected Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'examples')
-rw-r--r--examples/README.md8
-rw-r--r--examples/c-captured/c-captured.c67
-rw-r--r--examples/c-collectd/c-collectd.c30
-rw-r--r--examples/c-simple/c-simple.c162
-rw-r--r--examples/go-dashboard/main.go8
-rwxr-xr-xexamples/py-flow-dashboard/flow-dash.py38
-rwxr-xr-xexamples/py-flow-info/flow-info.py187
-rwxr-xr-xexamples/py-flow-muliprocess/py-flow-multiprocess.py37
-rwxr-xr-xexamples/py-ja3-checker/py-ja3-checker.py4
-rwxr-xr-xexamples/py-json-stdout/json-stdout.py4
-rwxr-xr-xexamples/py-schema-validation/py-schema-validation.py4
-rwxr-xr-xexamples/py-semantic-validation/py-semantic-validation.py237
12 files changed, 634 insertions, 152 deletions
diff --git a/examples/README.md b/examples/README.md
index 95804aabb..39324fdeb 100644
--- a/examples/README.md
+++ b/examples/README.md
@@ -17,9 +17,13 @@ A collecd-exec compatible middleware that gathers statistic values from nDPId.
Tiny nDPId json dumper. Does not provide any useful funcationality besides dumping parsed JSON objects.
-## go-dashboard
+## c-simple
-A discontinued tty UI nDPId dashboard. I've figured out that Go + UI is a bad idea, in particular if performance is a concern.
+Very tiny integration example.
+
+## go-dashboard (DISCONTINUED!)
+
+A discontinued tty UI nDPId dashboard.
## py-flow-info
diff --git a/examples/c-captured/c-captured.c b/examples/c-captured/c-captured.c
index 01d2cd041..3437091f1 100644
--- a/examples/c-captured/c-captured.c
+++ b/examples/c-captured/c-captured.c
@@ -18,6 +18,7 @@
#include <unistd.h>
#include <ndpi_typedefs.h>
+#include <ndpi_api.h>
#include "nDPIsrvd.h"
#include "utarray.h"
@@ -41,7 +42,6 @@ struct packet_data
struct flow_user_data
{
- nDPIsrvd_ull flow_last_seen_ts_sec;
uint8_t flow_new_seen;
uint8_t detection_finished;
uint8_t guessed;
@@ -71,6 +71,19 @@ static ndpi_risk process_risky = NDPI_NO_RISK;
static uint8_t process_midstream = 0;
static uint8_t ignore_empty_flows = 0;
+#ifdef ENABLE_MEMORY_PROFILING
+void nDPIsrvd_memprof_log(char const * const format, ...)
+{
+ va_list ap;
+
+ va_start(ap, format);
+ fprintf(stderr, "%s", "nDPIsrvd MemoryProfiler: ");
+ vfprintf(stderr, format, ap);
+ fprintf(stderr, "%s\n", "");
+ va_end(ap);
+}
+#endif
+
static void packet_data_copy(void * dst, const void * src)
{
struct packet_data * const pd_dst = (struct packet_data *)dst;
@@ -324,11 +337,14 @@ static enum nDPIsrvd_conversion_return perror_ull(enum nDPIsrvd_conversion_retur
}
static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_socket * const sock,
+ struct nDPIsrvd_instance * const instance,
struct nDPIsrvd_flow * const flow)
{
+ (void)instance;
+
if (flow == NULL)
{
- return CALLBACK_OK; // We do not care for non flow/packet-flow events for NOW.
+ return CALLBACK_OK; // We do not care for non-flow events for NOW except for packet-flow events.
}
struct flow_user_data * const flow_user = (struct flow_user_data *)flow->flow_user_data;
@@ -354,11 +370,8 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock
return CALLBACK_ERROR;
}
- nDPIsrvd_ull pkt_ts_sec = 0ull;
- perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "pkt_ts_sec"), &pkt_ts_sec), "pkt_ts_sec");
-
- nDPIsrvd_ull pkt_ts_usec = 0ull;
- perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "pkt_ts_usec"), &pkt_ts_usec), "pkt_ts_usec");
+ nDPIsrvd_ull ts_msec = 0ull;
+ perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "ts_msec"), &ts_msec), "ts_msec");
nDPIsrvd_ull pkt_len = 0ull;
perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "pkt_len"), &pkt_len), "pkt_len");
@@ -369,8 +382,8 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock
nDPIsrvd_ull pkt_l4_offset = 0ull;
perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "pkt_l4_offset"), &pkt_l4_offset), "pkt_l4_offset");
- struct packet_data pd = {.packet_ts_sec = pkt_ts_sec,
- .packet_ts_usec = pkt_ts_usec,
+ struct packet_data pd = {.packet_ts_sec = ts_msec / 1000,
+ .packet_ts_usec = (ts_msec % 1000) * 1000,
.packet_len = pkt_len,
.base64_packet_size = pkt->value_length,
.base64_packet_const = pkt->value};
@@ -385,9 +398,6 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock
perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "flow_tot_l4_payload_len"),
&flow_user->flow_tot_l4_payload_len),
"flow_tot_l4_payload_len");
-
- perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "flow_last_seen"), &flow_user->flow_last_seen_ts_sec),
- "flow_last_seen");
}
if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "new") != 0)
@@ -480,12 +490,14 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock
return CALLBACK_OK;
}
-static void nDPIsrvd_write_flow_info_cb(int outfd, struct nDPIsrvd_flow * const flow)
+static void nDPIsrvd_write_flow_info_cb(struct nDPIsrvd_flow const * const flow, void * user_data)
{
- struct flow_user_data * const flow_user = (struct flow_user_data *)flow->flow_user_data;
+ (void)user_data;
+
+ struct flow_user_data const * const flow_user = (struct flow_user_data const *)flow->flow_user_data;
- dprintf(outfd,
- "[ptr: "
+ fprintf(stderr,
+ "[Flow %4llu][ptr: "
#ifdef __LP64__
"0x%016llx"
#else
@@ -493,13 +505,14 @@ static void nDPIsrvd_write_flow_info_cb(int outfd, struct nDPIsrvd_flow * const
#endif
"][last-seen: %13llu][new-seen: %u][finished: %u][detected: %u][risky: "
"%u][total-L4-payload-length: "
- "%4llu][packets-captured: %u]",
+ "%4llu][packets-captured: %u]\n",
+ flow->id_as_ull,
#ifdef __LP64__
(unsigned long long int)flow,
#else
(unsigned long int)flow,
#endif
- flow_user->flow_last_seen_ts_sec,
+ flow->last_seen,
flow_user->flow_new_seen,
flow_user->detection_finished,
flow_user->detected,
@@ -523,7 +536,7 @@ static void nDPIsrvd_write_flow_info_cb(int outfd, struct nDPIsrvd_flow * const
#else
(unsigned long int)flow,
#endif
- flow_user->flow_last_seen_ts_sec,
+ flow->last_seen,
flow_user->flow_new_seen,
flow_user->detection_finished,
flow_user->detected,
@@ -536,7 +549,7 @@ static void sighandler(int signum)
{
if (signum == SIGUSR1)
{
- nDPIsrvd_write_flow_info(2, sock, nDPIsrvd_write_flow_info_cb);
+ nDPIsrvd_flow_info(sock, nDPIsrvd_write_flow_info_cb, NULL);
}
else if (main_thread_shutdown == 0)
{
@@ -544,9 +557,14 @@ static void sighandler(int signum)
}
}
-static void captured_flow_end_callback(struct nDPIsrvd_socket * const sock, struct nDPIsrvd_flow * const flow)
+static void captured_flow_cleanup_callback(struct nDPIsrvd_socket * const sock,
+ struct nDPIsrvd_instance * const instance,
+ struct nDPIsrvd_flow * const flow,
+ enum nDPIsrvd_cleanup_reason reason)
{
(void)sock;
+ (void)instance;
+ (void)reason;
#ifdef VERBOSE
printf("flow %llu end, remaining flows: %u\n", flow->id_as_ull, sock->flow_table->hh.tbl->num_items);
@@ -758,7 +776,8 @@ static int mainloop(void)
int main(int argc, char ** argv)
{
- sock = nDPIsrvd_init(0, sizeof(struct flow_user_data), captured_json_callback, captured_flow_end_callback);
+ sock =
+ nDPIsrvd_socket_init(0, sizeof(struct flow_user_data), captured_json_callback, captured_flow_cleanup_callback);
if (sock == NULL)
{
fprintf(stderr, "%s: nDPIsrvd socket memory allocation failed!\n", argv[0]);
@@ -777,7 +796,7 @@ int main(int argc, char ** argv)
if (connect_ret != CONNECT_OK)
{
fprintf(stderr, "%s: nDPIsrvd socket connect to %s failed!\n", argv[0], serv_optarg);
- nDPIsrvd_free(&sock);
+ nDPIsrvd_socket_free(&sock);
return 1;
}
@@ -809,7 +828,7 @@ int main(int argc, char ** argv)
int retval = mainloop();
- nDPIsrvd_free(&sock);
+ nDPIsrvd_socket_free(&sock);
daemonize_shutdown(pidfile);
closelog();
diff --git a/examples/c-collectd/c-collectd.c b/examples/c-collectd/c-collectd.c
index d7d0819b4..c16a0847d 100644
--- a/examples/c-collectd/c-collectd.c
+++ b/examples/c-collectd/c-collectd.c
@@ -23,7 +23,6 @@
syslog(flags, format, __VA_ARGS__); \
}
-static struct nDPIsrvd_socket * sock = NULL;
static int main_thread_shutdown = 0;
static int collectd_timerfd = -1;
static pid_t collectd_pid;
@@ -97,6 +96,19 @@ static struct
uint64_t flow_l4_other_count;
} collectd_statistics = {};
+#ifdef ENABLE_MEMORY_PROFILING
+void nDPIsrvd_memprof_log(char const * const format, ...)
+{
+ va_list ap;
+
+ va_start(ap, format);
+ fprintf(stderr, "%s", "nDPIsrvd MemoryProfiler: ");
+ vfprintf(stderr, format, ap);
+ fprintf(stderr, "%s\n", "");
+ va_end(ap);
+}
+#endif
+
static int set_collectd_timer(void)
{
const time_t interval = collectd_interval_ull * 1000;
@@ -132,7 +144,7 @@ static void sighandler(int signum)
}
}
-static int parse_options(int argc, char ** argv)
+static int parse_options(int argc, char ** argv, struct nDPIsrvd_socket * const sock)
{
int opt;
@@ -344,7 +356,7 @@ static void print_collectd_exec_output(void)
memset(&collectd_statistics, 0, sizeof(collectd_statistics));
}
-static int mainloop(int epollfd)
+static int mainloop(int epollfd, struct nDPIsrvd_socket * const sock)
{
struct epoll_event events[32];
size_t const events_size = sizeof(events) / sizeof(events[0]);
@@ -427,9 +439,11 @@ static uint64_t get_total_flow_bytes(struct nDPIsrvd_socket * const sock)
}
static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_socket * const sock,
+ struct nDPIsrvd_instance * const instance,
struct nDPIsrvd_flow * const flow)
{
(void)sock;
+ (void)instance;
(void)flow;
struct nDPIsrvd_json_token const * const flow_event_name = TOKEN_GET_SZ(sock, "flow_event_name");
@@ -668,14 +682,14 @@ int main(int argc, char ** argv)
openlog("nDPIsrvd-collectd", LOG_CONS, LOG_DAEMON);
- sock = nDPIsrvd_init(0, 0, captured_json_callback, NULL);
+ struct nDPIsrvd_socket * sock = nDPIsrvd_socket_init(0, 0, captured_json_callback, NULL);
if (sock == NULL)
{
LOG(LOG_DAEMON | LOG_ERR, "%s", "nDPIsrvd socket memory allocation failed!");
return 1;
}
- if (parse_options(argc, argv) != 0)
+ if (parse_options(argc, argv, sock) != 0)
{
return 1;
}
@@ -696,7 +710,7 @@ int main(int argc, char ** argv)
if (connect_ret != CONNECT_OK)
{
LOG(LOG_DAEMON | LOG_ERR, "nDPIsrvd socket connect to %s failed!", serv_optarg);
- nDPIsrvd_free(&sock);
+ nDPIsrvd_socket_free(&sock);
return 1;
}
@@ -738,9 +752,9 @@ int main(int argc, char ** argv)
}
LOG(LOG_DAEMON | LOG_NOTICE, "%s", "Initialization succeeded.");
- retval = mainloop(epollfd);
+ retval = mainloop(epollfd, sock);
- nDPIsrvd_free(&sock);
+ nDPIsrvd_socket_free(&sock);
close(collectd_timerfd);
close(epollfd);
closelog();
diff --git a/examples/c-simple/c-simple.c b/examples/c-simple/c-simple.c
new file mode 100644
index 000000000..5af146d7b
--- /dev/null
+++ b/examples/c-simple/c-simple.c
@@ -0,0 +1,162 @@
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include "nDPIsrvd.h"
+
+static int main_thread_shutdown = 0;
+static struct nDPIsrvd_socket * sock = NULL;
+
+#ifdef ENABLE_MEMORY_PROFILING
+void nDPIsrvd_memprof_log(char const * const format, ...)
+{
+ va_list ap;
+
+ va_start(ap, format);
+ fprintf(stderr, "%s", "nDPIsrvd MemoryProfiler: ");
+ vfprintf(stderr, format, ap);
+ fprintf(stderr, "%s\n", "");
+ va_end(ap);
+}
+#endif
+
+static void nDPIsrvd_write_flow_info_cb(struct nDPIsrvd_flow const * const flow, void * user_data)
+{
+ (void)user_data;
+
+ fprintf(stderr,
+ "[Flow %4llu][ptr: "
+#ifdef __LP64__
+ "0x%016llx"
+#else
+ "0x%08lx"
+#endif
+ "][last-seen: %13llu][idle-time: %13llu]\n",
+ flow->id_as_ull,
+#ifdef __LP64__
+ (unsigned long long int)flow,
+#else
+ (unsigned long int)flow,
+#endif
+ flow->last_seen,
+ flow->idle_time);
+}
+
+static void nDPIsrvd_verify_flows_cb(struct nDPIsrvd_flow const * const flow, void * user_data)
+{
+ (void)user_data;
+
+ fprintf(stderr, "Flow %llu verification failed\n", flow->id_as_ull);
+}
+
+static void sighandler(int signum)
+{
+ struct nDPIsrvd_instance * current_instance;
+ struct nDPIsrvd_instance * itmp;
+ int verification_failed = 0;
+
+ if (signum == SIGUSR1)
+ {
+ nDPIsrvd_flow_info(sock, nDPIsrvd_write_flow_info_cb, NULL);
+
+ HASH_ITER(hh, sock->instance_table, current_instance, itmp)
+ {
+ if (nDPIsrvd_verify_flows(current_instance, nDPIsrvd_verify_flows_cb, NULL) != 0)
+ {
+ fprintf(stderr, "Flow verification failed for instance %d\n", current_instance->alias_source_key);
+ verification_failed = 1;
+ }
+ }
+ if (verification_failed == 0)
+ {
+ fprintf(stderr, "%s\n", "Flow verification succeeded.");
+ }
+ }
+ else if (main_thread_shutdown == 0)
+ {
+ main_thread_shutdown = 1;
+ }
+}
+
+static enum nDPIsrvd_callback_return simple_json_callback(struct nDPIsrvd_socket * const sock,
+ struct nDPIsrvd_instance * const instance,
+ struct nDPIsrvd_flow * const flow)
+{
+ (void)sock;
+ (void)flow;
+
+ struct nDPIsrvd_json_token const * const flow_event_name = TOKEN_GET_SZ(sock, "flow_event_name");
+ if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "new") != 0)
+ {
+ printf("Instance %d, Flow %llu new\n", instance->alias_source_key, flow->id_as_ull);
+ }
+
+ return CALLBACK_OK;
+}
+
+static void simple_flow_cleanup_callback(struct nDPIsrvd_socket * const sock,
+ struct nDPIsrvd_instance * const instance,
+ struct nDPIsrvd_flow * const flow,
+ enum nDPIsrvd_cleanup_reason reason)
+{
+ (void)sock;
+
+ char const * const reason_str = nDPIsrvd_enum_to_string(reason);
+ printf("Instance %d, Flow %llu cleanup, reason: %s\n",
+ instance->alias_source_key,
+ flow->id_as_ull,
+ (reason_str != NULL ? reason_str : "UNKNOWN"));
+
+ if (reason == CLEANUP_REASON_FLOW_TIMEOUT)
+ {
+ printf("Did an nDPId instance die or was SIGKILL'ed?\n");
+ exit(1);
+ }
+}
+
+int main(int argc, char ** argv)
+{
+ (void)argc;
+ (void)argv;
+
+ signal(SIGUSR1, sighandler);
+ signal(SIGINT, sighandler);
+ signal(SIGTERM, sighandler);
+ signal(SIGPIPE, sighandler);
+
+ sock = nDPIsrvd_socket_init(0, 0, simple_json_callback, simple_flow_cleanup_callback);
+ if (sock == NULL)
+ {
+ return 1;
+ }
+
+ if (nDPIsrvd_setup_address(&sock->address, "127.0.0.1:7000") != 0)
+ {
+ return 1;
+ }
+
+ if (nDPIsrvd_connect(sock) != CONNECT_OK)
+ {
+ nDPIsrvd_socket_free(&sock);
+ return 1;
+ }
+
+ enum nDPIsrvd_read_return read_ret;
+ while (main_thread_shutdown == 0 && (read_ret = nDPIsrvd_read(sock)) == READ_OK)
+ {
+ enum nDPIsrvd_parse_return parse_ret = nDPIsrvd_parse_all(sock);
+ if (parse_ret != PARSE_NEED_MORE_DATA)
+ {
+ printf("Could not parse json string: %s\n", nDPIsrvd_enum_to_string(parse_ret));
+ break;
+ }
+ }
+
+ if (read_ret != READ_OK)
+ {
+ printf("Parse read %s\n", nDPIsrvd_enum_to_string(read_ret));
+ }
+
+ return 1;
+}
diff --git a/examples/go-dashboard/main.go b/examples/go-dashboard/main.go
index 13c5d462b..c4cad3fb1 100644
--- a/examples/go-dashboard/main.go
+++ b/examples/go-dashboard/main.go
@@ -19,7 +19,7 @@ var (
InfoLogger *log.Logger
ErrorLogger *log.Logger
- NETWORK_BUFFER_MAX_SIZE uint16 = 12288
+ NETWORK_BUFFER_MAX_SIZE uint16 = 13312
NETWORK_BUFFER_LENGTH_DIGITS uint16 = 5
)
@@ -30,11 +30,11 @@ type packet_event struct {
FlowID uint32 `json:"flow_id"`
FlowPacketID uint64 `json:"flow_packet_id"`
+ Timestamp uint64 `json:"ts_msec"`
+
PacketEventID uint8 `json:"packet_event_id"`
PacketEventName string `json:"packet_event_name"`
PacketOversize bool `json:"pkt_oversize"`
- PacketTimestampS uint64 `json:"pkt_ts_sec"`
- PacketTimestampUs uint64 `json:"pkt_ts_usec"`
PacketLength uint32 `json:"pkt_len"`
PacketL4Length uint32 `json:"pkt_l4_len"`
Packet string `json:"pkt"`
@@ -49,7 +49,7 @@ type flow_event struct {
PacketID uint64 `json:"packet_id"`
FlowID uint32 `json:"flow_id"`
- FlowPacketID uint64 `json:"flow_packet_id"`
+ FlowPacketID uint64 `json:"flow_packets_processed"`
FlowFirstSeen uint64 `json:"flow_first_seen"`
FlowLastSeen uint64 `json:"flow_last_seen"`
FlowTotalLayer4DataLength uint64 `json:"flow_tot_l4_data_len"`
diff --git a/examples/py-flow-dashboard/flow-dash.py b/examples/py-flow-dashboard/flow-dash.py
index 2bf95af42..8e49ed020 100755
--- a/examples/py-flow-dashboard/flow-dash.py
+++ b/examples/py-flow-dashboard/flow-dash.py
@@ -88,25 +88,26 @@ def update_pie(n):
values = [0, 0, 0, 0, 0]
for flow_id in shared_flow_dict.keys():
+ try:
+ flow = shared_flow_dict[flow_id]
+ except KeyError:
+ continue
- if shared_flow_dict[flow_id]['is_risky'] is True:
+ if flow['is_risky'] is True:
values[0] += 1
- if shared_flow_dict[flow_id]['is_midstream'] is True:
+ if flow['is_midstream'] is True:
values[1] += 1
- if shared_flow_dict[flow_id]['is_detected'] is True:
+ if flow['is_detected'] is True:
values[2] += 1
- if shared_flow_dict[flow_id]['is_guessed'] is True:
+ if flow['is_guessed'] is True:
values[3] += 1
- if shared_flow_dict[flow_id]['is_not_detected'] is True:
+ if flow['is_not_detected'] is True:
values[4] += 1
- if shared_flow_dict[flow_id]['remove_me'] is True:
- del shared_flow_dict[flow_id]
-
# print(values)
return {
@@ -121,8 +122,13 @@ def web_worker():
app.run_server()
-def nDPIsrvd_worker_onJsonLineRecvd(json_dict, current_flow, global_user_data):
- if 'flow_event_name' not in json_dict:
+def nDPIsrvd_worker_onFlowCleanup(instance, current_flow, global_user_data):
+ del shared_flow_dict[current_flow.flow_id]
+
+ return True
+
+def nDPIsrvd_worker_onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
+ if 'flow_id' not in json_dict:
return True
# print(json_dict)
@@ -134,7 +140,9 @@ def nDPIsrvd_worker_onJsonLineRecvd(json_dict, current_flow, global_user_data):
shared_flow_dict[json_dict['flow_id']]['is_not_detected'] = False
shared_flow_dict[json_dict['flow_id']]['is_midstream'] = False
shared_flow_dict[json_dict['flow_id']]['is_risky'] = False
- shared_flow_dict[json_dict['flow_id']]['remove_me'] = False
+
+ if 'flow_event_name' not in json_dict:
+ return True
if json_dict['flow_event_name'] == 'new':
if 'midstream' in json_dict and json_dict['midstream'] != 0:
@@ -146,12 +154,8 @@ def nDPIsrvd_worker_onJsonLineRecvd(json_dict, current_flow, global_user_data):
elif json_dict['flow_event_name'] == 'detected':
shared_flow_dict[json_dict['flow_id']]['is_detected'] = True
shared_flow_dict[json_dict['flow_id']]['is_guessed'] = False
- shared_flow_dict[json_dict['flow_id']]['is_not_detected'] = False
if 'ndpi' in json_dict and 'flow_risk' in json_dict['ndpi']:
shared_flow_dict[json_dict['flow_id']]['is_risky'] = True
- elif json_dict['flow_event_name'] == 'idle' or \
- json_dict['flow_event_name'] == 'end':
- shared_flow_dict[json_dict['flow_id']]['remove_me'] = True
return True
@@ -165,7 +169,9 @@ def nDPIsrvd_worker(address, nDPIsrvd_global_user_data):
nsock = nDPIsrvdSocket()
nsock.connect(address)
- nsock.loop(nDPIsrvd_worker_onJsonLineRecvd, nDPIsrvd_global_user_data)
+ nsock.loop(nDPIsrvd_worker_onJsonLineRecvd,
+ nDPIsrvd_worker_onFlowCleanup,
+ nDPIsrvd_global_user_data)
if __name__ == '__main__':
diff --git a/examples/py-flow-info/flow-info.py b/examples/py-flow-info/flow-info.py
index 9dfa40957..1f25cea55 100755
--- a/examples/py-flow-info/flow-info.py
+++ b/examples/py-flow-info/flow-info.py
@@ -1,7 +1,9 @@
#!/usr/bin/env python3
import os
+import math
import sys
+import time
sys.path.append(os.path.dirname(sys.argv[0]) + '/../share/nDPId')
sys.path.append(os.path.dirname(sys.argv[0]) + '/../usr/share/nDPId')
@@ -16,6 +18,153 @@ except ImportError:
global args
global whois_db
+def set_attr_from_dict(some_object, some_dict, key_and_attr_name, default_value):
+ try:
+ setattr(some_object, key_and_attr_name, some_dict[key_and_attr_name])
+ except KeyError:
+ if default_value is not None and getattr(some_object, key_and_attr_name, None) is None:
+ setattr(some_object, key_and_attr_name, default_value)
+
+def set_attr_if_not_set(some_object, attr_name, value):
+ try:
+ getattr(some_object, attr_name)
+ except AttributeError:
+ setattr(some_object, attr_name, value)
+
+class Stats:
+ last_status_length = 0
+ avg_xfer_json_bytes = 0.0
+ expired_tot_l4_payload_len = 0
+ expired_avg_l4_payload_len = 0
+ total_flows = 0
+ risky_flows = 0
+ midstream_flows = 0
+ guessed_flows = 0
+ not_detected_flows = 0
+ start_time = 0.0
+ current_time = 0.0
+ json_lines = 0
+ spinner_state = 0
+
+ def __init__(self, nDPIsrvd_sock):
+ self.start_time = time.time()
+ self.nsock = nDPIsrvd_sock
+
+ def updateSpinner(self):
+ if self.current_time + 0.25 <= time.time():
+ self.spinner_state += 1
+
+ def getSpinner(self):
+ spinner_states = ['-', '\\', '|', '/']
+ return spinner_states[self.spinner_state % len(spinner_states)]
+
+ def getDataFromJson(self, json_dict, current_flow):
+ if current_flow is None:
+ return
+
+ set_attr_from_dict(current_flow, json_dict, 'flow_tot_l4_payload_len', 0)
+ set_attr_from_dict(current_flow, json_dict, 'flow_avg_l4_payload_len', 0)
+ if 'ndpi' in json_dict:
+ set_attr_from_dict(current_flow, json_dict['ndpi'], 'flow_risk', {})
+ else:
+ set_attr_from_dict(current_flow, {}, 'flow_risk', {})
+ set_attr_from_dict(current_flow, json_dict, 'midstream', 0)
+ set_attr_from_dict(current_flow, json_dict, 'flow_event_name', '')
+ set_attr_if_not_set(current_flow, 'guessed', False)
+ set_attr_if_not_set(current_flow, 'not_detected', False)
+ if current_flow.flow_event_name == 'guessed':
+ current_flow.guessed = True
+ elif current_flow.flow_event_name == 'not-detected':
+ current_flow.not_detected = True
+
+ def update(self, json_dict, current_flow):
+ self.updateSpinner()
+ self.json_lines += 1
+ self.current_time = time.time()
+ self.avg_xfer_json_bytes = self.nsock.received_bytes / (self.current_time - self.start_time)
+ self.getDataFromJson(json_dict, current_flow)
+
+ def updateOnCleanup(self, current_flow):
+ self.total_flows += 1
+ self.expired_tot_l4_payload_len += current_flow.flow_tot_l4_payload_len
+ self.expired_avg_l4_payload_len += current_flow.flow_avg_l4_payload_len
+ self.risky_flows += 1 if len(current_flow.flow_risk) > 0 else 0
+ self.midstream_flows += 1 if current_flow.midstream != 0 else 0
+ self.guessed_flows += 1 if current_flow.guessed is True else 0
+ self.not_detected_flows += 1 if current_flow.not_detected is True else 0
+
+ def getStatsFromFlowMgr(self):
+ alias_count = 0
+ source_count = 0
+ flow_count = 0
+ flow_tot_l4_payload_len = 0.0
+ flow_avg_l4_payload_len = 0.0
+ risky = 0
+ midstream = 0
+ guessed = 0
+ not_detected = 0
+
+ instances = self.nsock.flow_mgr.instances
+ for alias in instances:
+ alias_count += 1
+ for source in instances[alias]:
+ source_count += 1
+ for flow_id in instances[alias][source].flows:
+ flow_count += 1
+ current_flow = instances[alias][source].flows[flow_id]
+
+ flow_tot_l4_payload_len += current_flow.flow_tot_l4_payload_len
+ flow_avg_l4_payload_len += current_flow.flow_avg_l4_payload_len
+ risky += 1 if len(current_flow.flow_risk) > 0 else 0
+ midstream += 1 if current_flow.midstream != 0 else 0
+ guessed += 1 if current_flow.guessed is True else 0
+ not_detected = 1 if current_flow.not_detected is True else 0
+
+ return alias_count, source_count, flow_count, \
+ flow_tot_l4_payload_len, flow_avg_l4_payload_len, \
+ risky, midstream, guessed, not_detected
+
+ @staticmethod
+ def prettifyBytes(bytes_received):
+ size_names = ['B', 'KB', 'MB', 'GB', 'TB']
+ if bytes_received == 0:
+ i = 0
+ else:
+ i = min(int(math.floor(math.log(bytes_received, 1024))), len(size_names) - 1)
+ p = math.pow(1024, i)
+ s = round(bytes_received / p, 2)
+ return '{:.2f} {}'.format(s, size_names[i])
+
+ def resetStatus(self):
+ sys.stdout.write('\r' + str(' ' * self.last_status_length) + '\r')
+ sys.stdout.flush()
+
+ def printStatus(self):
+ alias_count, source_count, flow_count, \
+ tot_l4_payload_len, avg_l4_payload_len, \
+ risky, midstream, guessed, not_detected = self.getStatsFromFlowMgr()
+
+ out_str = '\r[n|tot|avg JSONs: {}|{}|{}/s] [tot|avg l4: {}|{}] ' \
+ '[lss|srcs: {}|{}] ' \
+ '[flws|rsky|mdstrm|!dtctd|gssd: {}|{}|{}|{}|{} / {}|{}|{}|{}|{}] [{}]' \
+ ''.format(self.json_lines,
+ Stats.prettifyBytes(self.nsock.received_bytes),
+ Stats.prettifyBytes(self.avg_xfer_json_bytes),
+ Stats.prettifyBytes(tot_l4_payload_len + self.expired_tot_l4_payload_len),
+ Stats.prettifyBytes(avg_l4_payload_len + self.expired_avg_l4_payload_len),
+ alias_count, source_count,
+ flow_count, risky, midstream, not_detected, guessed,
+ flow_count + self.total_flows,
+ risky + self.risky_flows,
+ midstream + self.midstream_flows,
+ not_detected + self.not_detected_flows,
+ guessed + self.guessed_flows,
+ self.getSpinner())
+ self.last_status_length = len(out_str) - 1 # '\r'
+
+ sys.stdout.write(out_str)
+ sys.stdout.flush()
+
def prettifyEvent(color_list, whitespaces, text):
term_attrs = str()
for color in color_list:
@@ -60,18 +209,35 @@ def whois(ip_str):
return None
return whois_db[ip_str]
-def onJsonLineRecvd(json_dict, current_flow, global_user_data):
+def onFlowCleanup(instance, current_flow, global_user_data):
+ stats = global_user_data
+ stats.updateOnCleanup(current_flow)
+
+ return True
+
+def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
+ stats = global_user_data
+ stats.update(json_dict, current_flow)
+ stats.resetStatus()
+
instance_and_source = ''
- instance_and_source += '[{}]'.format(TermColor.setColorByString(json_dict['alias']))
- instance_and_source += '[{}]'.format(TermColor.setColorByString(json_dict['source']))
+ instance_and_source += '[{}]'.format(TermColor.setColorByString(instance.alias))
+ instance_and_source += '[{}]'.format(TermColor.setColorByString(instance.source))
+ if 'daemon_event_id' in json_dict:
+ print('{} {}: {}'.format(instance_and_source, prettifyEvent([TermColor.WARNING, TermColor.BLINK], 16, 'DAEMON-EVENT'), json_dict['daemon_event_name']))
+ stats.printStatus()
+ return True
if 'basic_event_id' in json_dict:
- print('{} {}: {}'.format(instance_and_source, prettifyEvent([TermColor.WARNING, TermColor.BLINK], 16, 'BASIC-EVENT'), json_dict['basic_event_name']))
+ print('{} {}: {}'.format(instance_and_source, prettifyEvent([TermColor.FAIL, TermColor.BLINK], 16, 'BASIC-EVENT'), json_dict['basic_event_name']))
+ stats.printStatus()
return True
elif 'flow_event_id' not in json_dict:
+ stats.printStatus()
return True
if checkEventFilter(json_dict) is False:
+ stats.printStatus()
return True
ndpi_proto_categ_breed = ''
@@ -99,8 +265,11 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data):
line_suffix = ''
flow_event_name = ''
- if json_dict['flow_event_name'] == 'guessed' or json_dict['flow_event_name'] == 'not-detected':
+ if json_dict['flow_event_name'] == 'guessed':
flow_event_name += '{}{:>16}{}'.format(TermColor.HINT, json_dict['flow_event_name'], TermColor.END)
+ elif json_dict['flow_event_name'] == 'not-detected':
+ flow_event_name += '{}{:>16}{}'.format(TermColor.WARNING + TermColor.BOLD + TermColor.BLINK,
+ json_dict['flow_event_name'], TermColor.END)
else:
if json_dict['flow_event_name'] == 'new':
line_suffix = ''
@@ -145,6 +314,8 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data):
if len(ndpi_frisk) > 0:
print('{} {:>18}{}'.format(instance_and_source, '', ndpi_frisk))
+ stats.printStatus()
+
return True
if __name__ == '__main__':
@@ -169,4 +340,8 @@ if __name__ == '__main__':
nsock = nDPIsrvdSocket()
nsock.connect(address)
- nsock.loop(onJsonLineRecvd, None)
+ stats = Stats(nsock)
+ try:
+ nsock.loop(onJsonLineRecvd, onFlowCleanup, stats)
+ except KeyboardInterrupt:
+ print('\n\nKeyboard Interrupt: cleaned up {} flows.'.format(len(nsock.shutdown())))
diff --git a/examples/py-flow-muliprocess/py-flow-multiprocess.py b/examples/py-flow-muliprocess/py-flow-multiprocess.py
index 91bc693bc..b90ab536d 100755
--- a/examples/py-flow-muliprocess/py-flow-multiprocess.py
+++ b/examples/py-flow-muliprocess/py-flow-multiprocess.py
@@ -19,28 +19,41 @@ def mp_worker(unused, shared_flow_dict):
import time
while True:
s = str()
+ n = int()
+
for key in shared_flow_dict.keys():
- s += '{}, '.format(str(key))
+ try:
+ flow = shared_flow_dict[key]
+ except KeyError:
+ continue
+
+ s += '{}, '.format(str(flow.flow_id))
+ n += 1
+
if len(s) == 0:
s = '-'
else:
s = s[:-2]
- print('Flows: {}'.format(s))
+
+ print('Flows({}): {}'.format(n, s))
time.sleep(1)
-def nDPIsrvd_worker_onJsonLineRecvd(json_dict, current_flow, global_user_data):
+def nDPIsrvd_worker_onFlowCleanup(instance, current_flow, global_user_data):
+ shared_flow_dict = global_user_data
+
+ del shared_flow_dict[current_flow.flow_id]
+
+ return True
+
+
+def nDPIsrvd_worker_onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
shared_flow_dict = global_user_data
- if 'flow_event_name' not in json_dict:
+ if 'flow_id' not in json_dict:
return True
- if json_dict['flow_event_name'] == 'new':
- shared_flow_dict[json_dict['flow_id']] = current_flow
- elif json_dict['flow_event_name'] == 'idle' or \
- json_dict['flow_event_name'] == 'end':
- if json_dict['flow_id'] in shared_flow_dict:
- del shared_flow_dict[json_dict['flow_id']]
+ shared_flow_dict[current_flow.flow_id] = current_flow
return True
@@ -54,7 +67,9 @@ def nDPIsrvd_worker(address, shared_flow_dict):
nsock = nDPIsrvdSocket()
nsock.connect(address)
- nsock.loop(nDPIsrvd_worker_onJsonLineRecvd, shared_flow_dict)
+ nsock.loop(nDPIsrvd_worker_onJsonLineRecvd,
+ nDPIsrvd_worker_onFlowCleanup,
+ shared_flow_dict)
if __name__ == '__main__':
diff --git a/examples/py-ja3-checker/py-ja3-checker.py b/examples/py-ja3-checker/py-ja3-checker.py
index 3e7e9418f..b7f9df5b1 100755
--- a/examples/py-ja3-checker/py-ja3-checker.py
+++ b/examples/py-ja3-checker/py-ja3-checker.py
@@ -105,7 +105,7 @@ def getInfoFromJA3ER(ja3_hash):
print('No fingerprint for JA3 {} found.'.format(ja3_hash))
-def onJsonLineRecvd(json_dict, current_flow, global_user_data):
+def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
if 'tls' in json_dict and 'ja3' in json_dict['tls']:
if json_dict['tls']['client_requested_server_name'] == 'ja3er.com':
@@ -139,7 +139,7 @@ if __name__ == '__main__':
nsock = nDPIsrvdSocket()
nsock.connect(address)
try:
- nsock.loop(onJsonLineRecvd, None)
+ nsock.loop(onJsonLineRecvd, None, None)
except nDPIsrvd.SocketConnectionBroken as err:
sys.stderr.write('\n{}\n'.format(err))
except KeyboardInterrupt:
diff --git a/examples/py-json-stdout/json-stdout.py b/examples/py-json-stdout/json-stdout.py
index 9693ecef6..160a61e0c 100755
--- a/examples/py-json-stdout/json-stdout.py
+++ b/examples/py-json-stdout/json-stdout.py
@@ -13,7 +13,7 @@ except ImportError:
import nDPIsrvd
from nDPIsrvd import nDPIsrvdSocket, TermColor
-def onJsonLineRecvd(json_dict, current_flow, global_user_data):
+def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
print(json_dict)
return True
@@ -27,4 +27,4 @@ if __name__ == '__main__':
nsock = nDPIsrvdSocket()
nsock.connect(address)
- nsock.loop(onJsonLineRecvd, None)
+ nsock.loop(onJsonLineRecvd, None, None)
diff --git a/examples/py-schema-validation/py-schema-validation.py b/examples/py-schema-validation/py-schema-validation.py
index 6273a5aa5..6a07681b3 100755
--- a/examples/py-schema-validation/py-schema-validation.py
+++ b/examples/py-schema-validation/py-schema-validation.py
@@ -18,7 +18,7 @@ class Stats:
print_dot_every = 10
print_nmb_every = print_dot_every * 5
-def onJsonLineRecvd(json_dict, current_flow, global_user_data):
+def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
validation_done = nDPIsrvd.validateAgainstSchema(json_dict)
global_user_data.lines_processed += 1
@@ -45,7 +45,7 @@ if __name__ == '__main__':
nsock = nDPIsrvdSocket()
nsock.connect(address)
try:
- nsock.loop(onJsonLineRecvd, Stats())
+ nsock.loop(onJsonLineRecvd, None, Stats())
except nDPIsrvd.SocketConnectionBroken as err:
sys.stderr.write('\n{}\n'.format(err))
except KeyboardInterrupt:
diff --git a/examples/py-semantic-validation/py-semantic-validation.py b/examples/py-semantic-validation/py-semantic-validation.py
index 4aebb8e09..109a968b3 100755
--- a/examples/py-semantic-validation/py-semantic-validation.py
+++ b/examples/py-semantic-validation/py-semantic-validation.py
@@ -17,14 +17,18 @@ except ImportError:
class Stats:
event_counter = dict()
- lowest_flow_id_for_new_flow = 0
lines_processed = 0
print_dot_every = 10
print_nmb_every = print_dot_every * 5
+ def __init__(self, nDPIsrvd_sock):
+ self.resetEventCounter()
+ self.nsock = nDPIsrvd_sock
+
def resetEventCounter(self):
keys = ['init','reconnect','shutdown', \
- 'new','end','idle','guessed','detected','detection-update','not-detected', \
+ 'new','end','idle','update',
+ 'guessed','detected','detection-update','not-detected', \
'packet', 'packet-flow']
for k in keys:
self.event_counter[k] = 0
@@ -55,7 +59,7 @@ class Stats:
def getEventCounterStr(self):
keys = [ [ 'init','reconnect','shutdown' ], \
- [ 'new','end','idle' ], \
+ [ 'new','end','idle','update' ], \
[ 'guessed','detected','detection-update','not-detected' ], \
[ 'packet', 'packet-flow' ] ]
retval = str()
@@ -64,12 +68,8 @@ class Stats:
for k in klist:
retval += '| {:<16}: {:<4} '.format(k, self.event_counter[k])
retval += '\n--' + '-' * 98 + '\n'
- retval += 'Lowest possible flow id (for new flows): {}\n'.format(self.lowest_flow_id_for_new_flow)
return retval
- def __init__(self):
- self.resetEventCounter()
-
class SemanticValidationException(Exception):
def __init__(self, current_flow, text):
self.text = text
@@ -80,72 +80,139 @@ class SemanticValidationException(Exception):
else:
return 'Flow ID {}: {}'.format(self.current_flow.flow_id, self.text)
-def onJsonLineRecvd(json_dict, current_flow, global_user_data):
- stats = global_user_data
+def onFlowCleanup(instance, current_flow, global_user_data):
+ _, enable_timeout_check, _ = global_user_data
+
+ if type(instance) is not nDPIsrvd.Instance:
+ raise SemanticValidationException(current_flow,
+ 'instance is not of type nDPIsrvd.Instance: ' \
+ '{}'.format(type(instance)))
+ if type(current_flow) is not nDPIsrvd.Flow:
+ raise SemanticValidationException(current_flow,
+ 'current_flow is not of type nDPIsrvd.Flow: ' \
+ '{}'.format(type(current_flow)))
+ if type(global_user_data) is not tuple:
+ raise SemanticValidationException(current_flow,
+ 'global_user_data is not of type tuple: ' \
+ '{}'.format(type(global_user_data)))
+
+ if current_flow.cleanup_reason == nDPIsrvd.FlowManager.CLEANUP_REASON_INVALID:
+ raise SemanticValidationException(current_flow,
+ 'Invalid flow cleanup reason')
+
+ if current_flow.cleanup_reason == nDPIsrvd.FlowManager.CLEANUP_REASON_FLOW_TIMEOUT:
+ raise SemanticValidationException(current_flow,
+ 'Unexpected flow cleanup reason: CLEANUP_REASON_FLOW_TIMEOUT')
+
+ if enable_timeout_check is True:
+ try:
+ l4_proto = current_flow.l4_proto
+ except AttributeError:
+ l4_proto = 'n/a'
+
+ invalid_flows = stats.nsock.verify()
+ if len(invalid_flows) > 0:
+ invalid_flows_str = ''
+ for flow_id in invalid_flows:
+ flow = instance.flows[flow_id]
+ try:
+ l4_proto = flow.l4_proto
+ except AttributeError:
+ l4_proto = 'n/a'
+ invalid_flows_str += '{} proto[{},{}] ts[{} + {} < {}] diff[{}], '.format(flow_id, l4_proto, flow.flow_idle_time,
+ flow.flow_last_seen, flow.flow_idle_time,
+ instance.most_recent_flow_time,
+ instance.most_recent_flow_time -
+ (flow.flow_last_seen + flow.flow_idle_time))
+
+ raise SemanticValidationException(None, 'Flow Manager verification failed for: {}'.format(invalid_flows_str[:-2]))
+
+ return True
+
+class ThreadData(object):
+ lowest_possible_flow_id = 0
+ lowest_possible_packet_id = 0
+
+def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
+ _, _, stats = global_user_data
stats.incrementEventCounter(json_dict)
- # dictionary unique for every flow, useful for flow specific semantic validation
+ if type(instance) is not nDPIsrvd.Instance:
+ raise SemanticValidationException(current_flow,
+ 'instance is not of type nDPIsrvd.Instance: ' \
+ '{}'.format(type(instance)))
+ if type(current_flow) is not nDPIsrvd.Flow and current_flow is not None:
+ raise SemanticValidationException(current_flow,
+ 'current_flow is not of type nDPIsrvd.Flow: ' \
+ '{}'.format(type(current_flow)))
+ if type(global_user_data) is not tuple:
+ raise SemanticValidationException(current_flow,
+ 'global_user_data is not of type tuple: ' \
+ '{}'.format(type(global_user_data)))
+ if type(stats) is not Stats:
+ raise SemanticValidationException(current_flow,
+ 'stats is not of type Stats: ' \
+ '{}'.format(type(stats)))
+
try:
- semdict = current_flow.semdict
+ thread_data_dict = instance.thread_data
except AttributeError:
- try:
- semdict = current_flow.semdict = dict()
- except AttributeError:
- semdict = dict()
+ thread_data_dict = instance.thread_data = dict()
- if 'current_flow' in semdict:
- if semdict['current_flow'] != current_flow:
- raise SemanticValidationException(current_flow,
- 'Semantic dictionary flow reference != current flow reference: ' \
- '{} != {}'.format(semdict['current_flow'], current_flow))
+ if json_dict['thread_id'] in thread_data_dict:
+ td = thread_data_dict[json_dict['thread_id']]
else:
- semdict['current_flow'] = current_flow
+ td = thread_data_dict[json_dict['thread_id']] = ThreadData()
+
+ lowest_possible_flow_id = td.lowest_possible_flow_id
+ lowest_possible_packet_id = td.lowest_possible_packet_id
if current_flow is not None:
- if 'pkt_ts_sec' in json_dict:
- current_flow.last_pkt_seen = int(json_dict['pkt_ts_sec'] * 1000.0)
- if 'pkt_ts_usec' in json_dict:
- current_flow.last_pkt_seen += int(json_dict['pkt_ts_usec'] / 1000.0)
- else:
- raise SemanticValidationException(current_flow,
- 'Got pkt_ts_sec but no pkt_ts_usec for packet id ' \
- '{}'.format(json_dict['packet_id']))
- if 'flow_id' in semdict:
- semdict_thread_key = 'thread' + str(json_dict['thread_id'])
- if semdict_thread_key in semdict:
- if semdict[semdict_thread_key]['lowest_packet_id'] > json_dict['packet_id']:
- raise SemanticValidationException(current_flow,
- 'Invalid packet id for thread {} received: ' \
- 'expected packet id lesser or equal {}, ' \
- 'got {}'.format(json_dict['thread_id'],
- semdict[semdict_thread_key]['lowest_packet_id'],
- json_dict['packet_id']))
- else:
- semdict[semdict_thread_key] = dict()
- semdict[semdict_thread_key]['lowest_packet_id'] = json_dict['packet_id']
-
- if semdict['flow_id'] != current_flow.flow_id or \
- semdict['flow_id'] != json_dict['flow_id']:
- raise SemanticValidationException(current_flow,
- 'Semantic dictionary flow id != current flow id != JSON dictionary flow id: ' \
- '{} != {} != {}'.format(semdict['flow_id'], \
- current_flow.flow_id, json_dict['flow_id']))
- else:
- if json_dict['flow_id'] != current_flow.flow_id:
- raise SemanticValidationException(current_flow,
- 'JSON dictionary flow id != current flow id: ' \
- '{} != {}'.format(json_dict['flow_id'], current_flow.flow_id))
- semdict['flow_id'] = json_dict['flow_id']
+ if instance.flows[current_flow.flow_id] != current_flow:
+ raise SemanticValidationException(current_flow,
+ 'FlowManager flow reference != current flow reference: ' \
+ '{} != {}'.format(instance.flows[current_flow.flow_id], current_flow))
+
+ if 'l4_proto' in json_dict:
+ try:
+ l4_proto = current_flow.l4_proto
+ except AttributeError:
+ l4_proto = current_flow.l4_proto = json_dict['l4_proto']
+
+ if l4_proto != json_dict['l4_proto']:
+ raise SemanticValidationException(current_flow, 'Layer4 protocol mismatch: {} != {}'.format(l4_proto, json_dict['l4_proto']))
+ elif json_dict['packet_event_name'] != 'packet-flow':
+ raise SemanticValidationException(current_flow, 'Layer4 protocol not found in JSON')
+
+ if 'flow_last_seen' in json_dict:
+ if json_dict['flow_last_seen'] != current_flow.flow_last_seen:
+ raise SemanticValidationException(current_flow, 'Flow last seen: {} != {}'.format(json_dict['flow_last_seen'],
+ current_flow.flow_last_seen))
+
+ if 'flow_idle_time' in json_dict:
+ if json_dict['flow_idle_time'] != current_flow.flow_idle_time:
+ raise SemanticValidationException(current_flow, 'Flow idle time mismatch: {} != {}'.format(json_dict['flow_idle_time'],
+ current_flow.flow_idle_time))
+
+ if ('flow_last_seen' in json_dict and 'flow_idle_time' not in json_dict) or \
+ ('flow_last_seen' not in json_dict and 'flow_idle_time' in json_dict):
+ raise SemanticValidationException(current_flow,
+ 'Got a JSON string with only one of both keys, ' \
+ 'both required for timeout handling:' \
+ 'flow_last_seen, flow_idle_time')
+
+ if 'ts_msec' in json_dict:
+ current_flow.ts_msec = int(json_dict['ts_msec'])
if 'flow_packet_id' in json_dict:
try:
- if json_dict['flow_packet_id'] != current_flow.low_packet_id + 1:
+ if json_dict['flow_packet_id'] != current_flow.flow_packet_id + 1:
raise SemanticValidationException(current_flow,
'Invalid flow_packet_id seen, expected {}, got ' \
- '{}'.format(current_flow.low_packet_id + 1, json_dict['flow_packet_id']))
+ '{}'.format(current_flow.flow_packet_id + 1, json_dict['flow_packet_id']))
else:
- current_flow.low_packet_id += 1
+ current_flow.flow_packet_id += 1
except AttributeError:
pass
@@ -156,14 +223,32 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data):
except AttributeError:
pass
+ if 'packet_event_name' in json_dict:
+ if json_dict['packet_event_name'] == 'packet-flow':
+ if lowest_possible_packet_id > json_dict['packet_id']:
+ raise SemanticValidationException(current_flow,
+ 'Invalid packet id for thread {} received: ' \
+ 'expected packet id lesser or equal {}, ' \
+ 'got {}'.format(json_dict['thread_id'],
+ lowest_possible_packet_id,
+ json_dict['packet_id']))
+ td.lowest_possible_packet_id = lowest_possible_packet_id
+
+ if 'flow_id' in json_dict:
+ if current_flow.flow_id != json_dict['flow_id']:
+ raise SemanticValidationException(current_flow,
+ 'Current flow id != JSON dictionary flow id: ' \
+ '{} != {}'.format(current_flow.flow_id, json_dict['flow_id']))
+
if 'flow_event_name' in json_dict:
try:
- if json_dict['flow_first_seen'] > current_flow.last_pkt_seen or \
- json_dict['flow_last_seen'] < current_flow.last_pkt_seen:
+ if json_dict['flow_first_seen'] > current_flow.ts_msec or \
+ json_dict['flow_last_seen'] > current_flow.ts_msec or \
+ json_dict['flow_first_seen'] > json_dict['flow_last_seen']:
raise SemanticValidationException(current_flow,
'Last packet timestamp is invalid: ' \
- 'first_seen({}) <= {} <= last_seen({})'.format(json_dict['flow_first_seen'],
- current_flow.last_pkt_seen,
+ 'first_seen({}) <= {} >= last_seen({})'.format(json_dict['flow_first_seen'],
+ current_flow.ts_msec,
json_dict['flow_last_seen']))
except AttributeError:
if json_dict['flow_event_name'] == 'new':
@@ -173,10 +258,10 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data):
json_dict['flow_event_name'] == 'idle':
current_flow.flow_ended = True
elif json_dict['flow_event_name'] == 'new':
- if stats.lowest_flow_id_for_new_flow > current_flow.flow_id:
+ if lowest_possible_flow_id > current_flow.flow_id:
raise SemanticValidationException(current_flow,
'JSON dictionary lowest flow id for new flow > current flow id: ' \
- '{} != {}'.format(stats.lowest_flow_id_for_new_flow, current_flow.flow_id))
+ '{} != {}'.format(lowest_possible_flow_id, current_flow.flow_id))
try:
if current_flow.flow_new_seen == True:
raise SemanticValidationException(current_flow,
@@ -185,8 +270,8 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data):
pass
current_flow.flow_new_seen = True
current_flow.flow_packet_id = 0
- if stats.lowest_flow_id_for_new_flow == 0:
- stats.lowest_flow_id_for_new_flow = current_flow.flow_id
+ if lowest_possible_flow_id == 0:
+ td.lowest_possible_flow_id = current_flow.flow_id
elif json_dict['flow_event_name'] == 'detected' or \
json_dict['flow_event_name'] == 'not-detected':
try:
@@ -198,19 +283,19 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data):
current_flow.flow_detection_finished = True
try:
- if current_flow.flow_new_seen is True and stats.lowest_flow_id_for_new_flow > current_flow.flow_id:
+ if current_flow.flow_new_seen is True and lowest_possible_flow_id > current_flow.flow_id:
raise SemanticValidationException(current_flow, 'Lowest flow id for flow > current flow id: ' \
- '{} > {}'.format(stats.lowest_flow_id_for_new_flow, current_flow.flow_id))
+ '{} > {}'.format(lowest_possible_flow_id, current_flow.flow_id))
except AttributeError:
pass
- global_user_data.lines_processed += 1
- if global_user_data.lines_processed % global_user_data.print_dot_every == 0:
+ stats.lines_processed += 1
+ if stats.lines_processed % stats.print_dot_every == 0:
sys.stdout.write('.')
sys.stdout.flush()
- print_nmb_every = global_user_data.print_nmb_every + (len(str(global_user_data.lines_processed)) * global_user_data.print_dot_every)
- if global_user_data.lines_processed % print_nmb_every == 0:
- sys.stdout.write(str(global_user_data.lines_processed))
+ print_nmb_every = stats.print_nmb_every + (len(str(stats.lines_processed)) * stats.print_dot_every)
+ if stats.lines_processed % print_nmb_every == 0:
+ sys.stdout.write(str(stats.lines_processed))
sys.stdout.flush()
return True
@@ -218,6 +303,8 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data):
if __name__ == '__main__':
argparser = nDPIsrvd.defaultArgumentParser()
argparser.add_argument('--strict', action='store_true', default=False, help='Require and validate a full nDPId application lifecycle.')
+ argparser.add_argument('--enable-timeout-check', action='store_true', default=False,
+ help='Enable additional flow timeout validation. See README.md for more information')
args = argparser.parse_args()
address = nDPIsrvd.validateAddress(args)
@@ -226,9 +313,9 @@ if __name__ == '__main__':
nsock = nDPIsrvdSocket()
nsock.connect(address)
- stats = Stats()
+ stats = Stats(nsock)
try:
- nsock.loop(onJsonLineRecvd, stats)
+ nsock.loop(onJsonLineRecvd, onFlowCleanup, (args.strict, args.enable_timeout_check, stats))
except nDPIsrvd.SocketConnectionBroken as err:
sys.stderr.write('\n{}\n'.format(err))
except KeyboardInterrupt: