aboutsummaryrefslogtreecommitdiff
path: root/examples/py-semantic-validation
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2021-04-14 21:39:23 +0200
committerToni Uhlig <matzeton@googlemail.com>2021-04-14 22:04:42 +0200
commitf713ec702bd367f14c6ff75ea89f3c155c65a904 (patch)
tree320099fa4006aba70fa1854aaadb68344002c5f1 /examples/py-semantic-validation
parent514c4279170bde53a2969e1074a48ddd658d48ff (diff)
Added nDPId semantic validation test.
* fixed inconsistent processing of remaining flows during nDPId shutdown phase * fixed multiple `detected' flow events (instead only `detection-update' flow events can occur after a `detected' flow event) * fixed nDPIsrvd.py invalid message buffer handling * improved run_tests.sh so only valid pcap capture files are getting processed (and some more cosmetics + logging) Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'examples/py-semantic-validation')
-rwxr-xr-xexamples/py-semantic-validation/py-semantic-validation.py182
1 files changed, 182 insertions, 0 deletions
diff --git a/examples/py-semantic-validation/py-semantic-validation.py b/examples/py-semantic-validation/py-semantic-validation.py
new file mode 100755
index 000000000..d4423467e
--- /dev/null
+++ b/examples/py-semantic-validation/py-semantic-validation.py
@@ -0,0 +1,182 @@
+#!/usr/bin/env python3
+
+import os
+import sys
+
+sys.path.append(os.path.dirname(sys.argv[0]) + '/../share/nDPId')
+sys.path.append(os.path.dirname(sys.argv[0]) + '/../usr/share/nDPId')
+try:
+ import nDPIsrvd
+ from nDPIsrvd import nDPIsrvdSocket, TermColor
+except ImportError:
+ sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
+ import nDPIsrvd
+ from nDPIsrvd import nDPIsrvdSocket, TermColor
+
+global lowest_flow_id_for_new_flow
+lowest_flow_id_for_new_flow = 0
+
+class Stats:
+ event_counter = dict()
+
+ lines_processed = 0
+ print_dot_every = 10
+ print_nmb_every = print_dot_every * 5
+
+ def resetEventCounter(self):
+ keys = ['init','reconnect','shutdown', \
+ 'new','end','idle','guessed','detected','detection-update','not-detected', \
+ 'packet', 'packet-flow']
+ for k in keys:
+ self.event_counter[k] = 0
+
+ def incrementEventCounter(self, json_dict):
+ try:
+ if 'daemon_event_name' in json_dict:
+ self.event_counter[json_dict['daemon_event_name']] += 1
+ if 'flow_event_name' in json_dict:
+ self.event_counter[json_dict['flow_event_name']] += 1
+ if 'packet_event_name' in json_dict:
+ self.event_counter[json_dict['packet_event_name']] += 1
+ except KeyError as e:
+ raise RuntimeError('Semantic validation failed for event counter '
+ 'which received an invalid key: {}'.format(str(e)))
+
+ def verifyEventCounter(self):
+ if self.event_counter['shutdown'] != self.event_counter['init'] or self.event_counter['init'] == 0:
+ return False
+ if self.event_counter['new'] != self.event_counter['end'] + self.event_counter['idle']:
+ return False
+ if self.event_counter['new'] < self.event_counter['detected'] + self.event_counter['not-detected']:
+ return False
+ if self.event_counter['new'] < self.event_counter['guessed'] + self.event_counter['not-detected']:
+ return False
+
+ return True
+
+ def getEventCounterStr(self):
+ keys = [ [ 'init','reconnect','shutdown' ], \
+ [ 'new','end','idle' ], \
+ [ 'guessed','detected','detection-update','not-detected' ], \
+ [ 'packet', 'packet-flow' ] ]
+ retval = str()
+ retval += '-' * 98 + '--\n'
+ for klist in keys:
+ for k in klist:
+ retval += '| {:<16}: {:<4} '.format(k, self.event_counter[k])
+ retval += '\n--' + '-' * 98 + '\n'
+ return retval
+
+ def __init__(self):
+ self.resetEventCounter()
+
+class SemanticValidationException(Exception):
+ def __init__(self, current_flow, text):
+ self.text = text
+ self.current_flow = current_flow
+ def __str__(self):
+ if self.current_flow is None:
+ return '{}'.format(self.text)
+ else:
+ return 'Flow ID {}: {}'.format(self.current_flow.flow_id, self.text)
+
+def onJsonLineRecvd(json_dict, current_flow, global_user_data):
+ global lowest_flow_id_for_new_flow
+ stats = global_user_data
+ stats.incrementEventCounter(json_dict)
+
+ try:
+ semdict = current_flow.semdict
+ except AttributeError:
+ try:
+ semdict = current_flow.semdict = dict()
+ except AttributeError:
+ semdict = dict()
+
+ if 'current_flow' in semdict:
+ if semdict['current_flow'] != current_flow:
+ raise SemanticValidationException(current_flow,
+ 'Semantic dictionary flow reference != current flow reference: ' \
+ '{} != {}'.format(semdict['current_flow'], current_flow))
+ else:
+ semdict['current_flow'] = current_flow
+
+ if current_flow is not None:
+ if 'flow_id' in semdict:
+ if semdict['flow_id'] != current_flow.flow_id or \
+ semdict['flow_id'] != json_dict['flow_id']:
+ raise SemanticValidationException(current_flow,
+ 'Semantic dictionary flow id != current flow id != JSON dictionary flow id: ' \
+ '{} != {} != {}'.format(semdict['flow_id'], \
+ current_flow.flow_id, json_dict['flow_id']))
+ else:
+ if json_dict['flow_id'] != current_flow.flow_id:
+ raise SemanticValidationException(current_flow,
+ 'JSON dictionary flow id != current flow id: ' \
+ '{} != {}'.format(json_dict['flow_id'], current_flow.flow_id))
+ semdict['flow_id'] = json_dict['flow_id']
+
+ if 'flow_event_name' in json_dict:
+ if json_dict['flow_event_name'] == 'end' or \
+ json_dict['flow_event_name'] == 'idle':
+ pass
+ elif json_dict['flow_event_name'] == 'new':
+ if lowest_flow_id_for_new_flow > current_flow.flow_id:
+ raise SemanticValidationException(current_flow,
+ 'JSON dictionary lowest flow id for new flow > current flow id: ' \
+ '{} != {}'.format(lowest_flow_id_for_new_flow, current_flow.flow_id))
+ current_flow.flow_new_seen = True
+ if lowest_flow_id_for_new_flow == 0:
+ lowest_flow_id_for_new_flow = current_flow.flow_id
+ elif json_dict['flow_event_name'] == 'detected' or \
+ json_dict['flow_event_name'] == 'not-detected':
+ try:
+ if current_flow.flow_detection_finished is True:
+ raise SemanticValidationException(current_flow,
+ 'Flow detection already finished, but detected/not-detected event received.')
+ except AttributeError:
+ pass
+ current_flow.flow_detection_finished = True
+
+ try:
+ if current_flow.flow_new_seen is True and lowest_flow_id_for_new_flow > current_flow.flow_id:
+ raise SemanticValidationException(current_flow, 'Lowest flow id for flow > current flow id: ' \
+ '{} > {}'.format(lowest_flow_id_for_new_flow, current_flow.flow_id))
+ except AttributeError:
+ pass
+
+ global_user_data.lines_processed += 1
+ if global_user_data.lines_processed % global_user_data.print_dot_every == 0:
+ sys.stdout.write('.')
+ sys.stdout.flush()
+ print_nmb_every = global_user_data.print_nmb_every + (len(str(global_user_data.lines_processed)) * global_user_data.print_dot_every)
+ if global_user_data.lines_processed % print_nmb_every == 0:
+ sys.stdout.write(str(global_user_data.lines_processed))
+ sys.stdout.flush()
+
+ return True
+
+if __name__ == '__main__':
+ argparser = nDPIsrvd.defaultArgumentParser()
+ argparser.add_argument('--strict', action='store_true', default=False, help='Require and validate a full nDPId application lifecycle.')
+ args = argparser.parse_args()
+ address = nDPIsrvd.validateAddress(args)
+
+ sys.stderr.write('Recv buffer size: {}\n'.format(nDPIsrvd.NETWORK_BUFFER_MAX_SIZE))
+ sys.stderr.write('Connecting to {} ..\n'.format(address[0]+':'+str(address[1]) if type(address) is tuple else address))
+
+ nsock = nDPIsrvdSocket()
+ nsock.connect(address)
+ stats = Stats()
+ try:
+ nsock.loop(onJsonLineRecvd, stats)
+ except nDPIsrvd.SocketConnectionBroken as err:
+ sys.stderr.write('\n{}\n'.format(err))
+ except KeyboardInterrupt:
+ print()
+
+ sys.stderr.write('\nEvent counter:\n' + stats.getEventCounterStr() + '\n')
+ if args.strict is True:
+ if stats.verifyEventCounter() is False:
+ sys.stderr.write('Event counter verification failed. (`--strict\')\n')
+ sys.exit(1)