diff options
Diffstat (limited to 'examples/py-semantic-validation/py-semantic-validation.py')
-rwxr-xr-x | examples/py-semantic-validation/py-semantic-validation.py | 237 |
1 files changed, 162 insertions, 75 deletions
diff --git a/examples/py-semantic-validation/py-semantic-validation.py b/examples/py-semantic-validation/py-semantic-validation.py index 4aebb8e09..109a968b3 100755 --- a/examples/py-semantic-validation/py-semantic-validation.py +++ b/examples/py-semantic-validation/py-semantic-validation.py @@ -17,14 +17,18 @@ except ImportError: class Stats: event_counter = dict() - lowest_flow_id_for_new_flow = 0 lines_processed = 0 print_dot_every = 10 print_nmb_every = print_dot_every * 5 + def __init__(self, nDPIsrvd_sock): + self.resetEventCounter() + self.nsock = nDPIsrvd_sock + def resetEventCounter(self): keys = ['init','reconnect','shutdown', \ - 'new','end','idle','guessed','detected','detection-update','not-detected', \ + 'new','end','idle','update', + 'guessed','detected','detection-update','not-detected', \ 'packet', 'packet-flow'] for k in keys: self.event_counter[k] = 0 @@ -55,7 +59,7 @@ class Stats: def getEventCounterStr(self): keys = [ [ 'init','reconnect','shutdown' ], \ - [ 'new','end','idle' ], \ + [ 'new','end','idle','update' ], \ [ 'guessed','detected','detection-update','not-detected' ], \ [ 'packet', 'packet-flow' ] ] retval = str() @@ -64,12 +68,8 @@ class Stats: for k in klist: retval += '| {:<16}: {:<4} '.format(k, self.event_counter[k]) retval += '\n--' + '-' * 98 + '\n' - retval += 'Lowest possible flow id (for new flows): {}\n'.format(self.lowest_flow_id_for_new_flow) return retval - def __init__(self): - self.resetEventCounter() - class SemanticValidationException(Exception): def __init__(self, current_flow, text): self.text = text @@ -80,72 +80,139 @@ class SemanticValidationException(Exception): else: return 'Flow ID {}: {}'.format(self.current_flow.flow_id, self.text) -def onJsonLineRecvd(json_dict, current_flow, global_user_data): - stats = global_user_data +def onFlowCleanup(instance, current_flow, global_user_data): + _, enable_timeout_check, _ = global_user_data + + if type(instance) is not nDPIsrvd.Instance: + raise SemanticValidationException(current_flow, + 'instance is not of type nDPIsrvd.Instance: ' \ + '{}'.format(type(instance))) + if type(current_flow) is not nDPIsrvd.Flow: + raise SemanticValidationException(current_flow, + 'current_flow is not of type nDPIsrvd.Flow: ' \ + '{}'.format(type(current_flow))) + if type(global_user_data) is not tuple: + raise SemanticValidationException(current_flow, + 'global_user_data is not of type tuple: ' \ + '{}'.format(type(global_user_data))) + + if current_flow.cleanup_reason == nDPIsrvd.FlowManager.CLEANUP_REASON_INVALID: + raise SemanticValidationException(current_flow, + 'Invalid flow cleanup reason') + + if current_flow.cleanup_reason == nDPIsrvd.FlowManager.CLEANUP_REASON_FLOW_TIMEOUT: + raise SemanticValidationException(current_flow, + 'Unexpected flow cleanup reason: CLEANUP_REASON_FLOW_TIMEOUT') + + if enable_timeout_check is True: + try: + l4_proto = current_flow.l4_proto + except AttributeError: + l4_proto = 'n/a' + + invalid_flows = stats.nsock.verify() + if len(invalid_flows) > 0: + invalid_flows_str = '' + for flow_id in invalid_flows: + flow = instance.flows[flow_id] + try: + l4_proto = flow.l4_proto + except AttributeError: + l4_proto = 'n/a' + invalid_flows_str += '{} proto[{},{}] ts[{} + {} < {}] diff[{}], '.format(flow_id, l4_proto, flow.flow_idle_time, + flow.flow_last_seen, flow.flow_idle_time, + instance.most_recent_flow_time, + instance.most_recent_flow_time - + (flow.flow_last_seen + flow.flow_idle_time)) + + raise SemanticValidationException(None, 'Flow Manager verification failed for: {}'.format(invalid_flows_str[:-2])) + + return True + +class ThreadData(object): + lowest_possible_flow_id = 0 + lowest_possible_packet_id = 0 + +def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): + _, _, stats = global_user_data stats.incrementEventCounter(json_dict) - # dictionary unique for every flow, useful for flow specific semantic validation + if type(instance) is not nDPIsrvd.Instance: + raise SemanticValidationException(current_flow, + 'instance is not of type nDPIsrvd.Instance: ' \ + '{}'.format(type(instance))) + if type(current_flow) is not nDPIsrvd.Flow and current_flow is not None: + raise SemanticValidationException(current_flow, + 'current_flow is not of type nDPIsrvd.Flow: ' \ + '{}'.format(type(current_flow))) + if type(global_user_data) is not tuple: + raise SemanticValidationException(current_flow, + 'global_user_data is not of type tuple: ' \ + '{}'.format(type(global_user_data))) + if type(stats) is not Stats: + raise SemanticValidationException(current_flow, + 'stats is not of type Stats: ' \ + '{}'.format(type(stats))) + try: - semdict = current_flow.semdict + thread_data_dict = instance.thread_data except AttributeError: - try: - semdict = current_flow.semdict = dict() - except AttributeError: - semdict = dict() + thread_data_dict = instance.thread_data = dict() - if 'current_flow' in semdict: - if semdict['current_flow'] != current_flow: - raise SemanticValidationException(current_flow, - 'Semantic dictionary flow reference != current flow reference: ' \ - '{} != {}'.format(semdict['current_flow'], current_flow)) + if json_dict['thread_id'] in thread_data_dict: + td = thread_data_dict[json_dict['thread_id']] else: - semdict['current_flow'] = current_flow + td = thread_data_dict[json_dict['thread_id']] = ThreadData() + + lowest_possible_flow_id = td.lowest_possible_flow_id + lowest_possible_packet_id = td.lowest_possible_packet_id if current_flow is not None: - if 'pkt_ts_sec' in json_dict: - current_flow.last_pkt_seen = int(json_dict['pkt_ts_sec'] * 1000.0) - if 'pkt_ts_usec' in json_dict: - current_flow.last_pkt_seen += int(json_dict['pkt_ts_usec'] / 1000.0) - else: - raise SemanticValidationException(current_flow, - 'Got pkt_ts_sec but no pkt_ts_usec for packet id ' \ - '{}'.format(json_dict['packet_id'])) - if 'flow_id' in semdict: - semdict_thread_key = 'thread' + str(json_dict['thread_id']) - if semdict_thread_key in semdict: - if semdict[semdict_thread_key]['lowest_packet_id'] > json_dict['packet_id']: - raise SemanticValidationException(current_flow, - 'Invalid packet id for thread {} received: ' \ - 'expected packet id lesser or equal {}, ' \ - 'got {}'.format(json_dict['thread_id'], - semdict[semdict_thread_key]['lowest_packet_id'], - json_dict['packet_id'])) - else: - semdict[semdict_thread_key] = dict() - semdict[semdict_thread_key]['lowest_packet_id'] = json_dict['packet_id'] - - if semdict['flow_id'] != current_flow.flow_id or \ - semdict['flow_id'] != json_dict['flow_id']: - raise SemanticValidationException(current_flow, - 'Semantic dictionary flow id != current flow id != JSON dictionary flow id: ' \ - '{} != {} != {}'.format(semdict['flow_id'], \ - current_flow.flow_id, json_dict['flow_id'])) - else: - if json_dict['flow_id'] != current_flow.flow_id: - raise SemanticValidationException(current_flow, - 'JSON dictionary flow id != current flow id: ' \ - '{} != {}'.format(json_dict['flow_id'], current_flow.flow_id)) - semdict['flow_id'] = json_dict['flow_id'] + if instance.flows[current_flow.flow_id] != current_flow: + raise SemanticValidationException(current_flow, + 'FlowManager flow reference != current flow reference: ' \ + '{} != {}'.format(instance.flows[current_flow.flow_id], current_flow)) + + if 'l4_proto' in json_dict: + try: + l4_proto = current_flow.l4_proto + except AttributeError: + l4_proto = current_flow.l4_proto = json_dict['l4_proto'] + + if l4_proto != json_dict['l4_proto']: + raise SemanticValidationException(current_flow, 'Layer4 protocol mismatch: {} != {}'.format(l4_proto, json_dict['l4_proto'])) + elif json_dict['packet_event_name'] != 'packet-flow': + raise SemanticValidationException(current_flow, 'Layer4 protocol not found in JSON') + + if 'flow_last_seen' in json_dict: + if json_dict['flow_last_seen'] != current_flow.flow_last_seen: + raise SemanticValidationException(current_flow, 'Flow last seen: {} != {}'.format(json_dict['flow_last_seen'], + current_flow.flow_last_seen)) + + if 'flow_idle_time' in json_dict: + if json_dict['flow_idle_time'] != current_flow.flow_idle_time: + raise SemanticValidationException(current_flow, 'Flow idle time mismatch: {} != {}'.format(json_dict['flow_idle_time'], + current_flow.flow_idle_time)) + + if ('flow_last_seen' in json_dict and 'flow_idle_time' not in json_dict) or \ + ('flow_last_seen' not in json_dict and 'flow_idle_time' in json_dict): + raise SemanticValidationException(current_flow, + 'Got a JSON string with only one of both keys, ' \ + 'both required for timeout handling:' \ + 'flow_last_seen, flow_idle_time') + + if 'ts_msec' in json_dict: + current_flow.ts_msec = int(json_dict['ts_msec']) if 'flow_packet_id' in json_dict: try: - if json_dict['flow_packet_id'] != current_flow.low_packet_id + 1: + if json_dict['flow_packet_id'] != current_flow.flow_packet_id + 1: raise SemanticValidationException(current_flow, 'Invalid flow_packet_id seen, expected {}, got ' \ - '{}'.format(current_flow.low_packet_id + 1, json_dict['flow_packet_id'])) + '{}'.format(current_flow.flow_packet_id + 1, json_dict['flow_packet_id'])) else: - current_flow.low_packet_id += 1 + current_flow.flow_packet_id += 1 except AttributeError: pass @@ -156,14 +223,32 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data): except AttributeError: pass + if 'packet_event_name' in json_dict: + if json_dict['packet_event_name'] == 'packet-flow': + if lowest_possible_packet_id > json_dict['packet_id']: + raise SemanticValidationException(current_flow, + 'Invalid packet id for thread {} received: ' \ + 'expected packet id lesser or equal {}, ' \ + 'got {}'.format(json_dict['thread_id'], + lowest_possible_packet_id, + json_dict['packet_id'])) + td.lowest_possible_packet_id = lowest_possible_packet_id + + if 'flow_id' in json_dict: + if current_flow.flow_id != json_dict['flow_id']: + raise SemanticValidationException(current_flow, + 'Current flow id != JSON dictionary flow id: ' \ + '{} != {}'.format(current_flow.flow_id, json_dict['flow_id'])) + if 'flow_event_name' in json_dict: try: - if json_dict['flow_first_seen'] > current_flow.last_pkt_seen or \ - json_dict['flow_last_seen'] < current_flow.last_pkt_seen: + if json_dict['flow_first_seen'] > current_flow.ts_msec or \ + json_dict['flow_last_seen'] > current_flow.ts_msec or \ + json_dict['flow_first_seen'] > json_dict['flow_last_seen']: raise SemanticValidationException(current_flow, 'Last packet timestamp is invalid: ' \ - 'first_seen({}) <= {} <= last_seen({})'.format(json_dict['flow_first_seen'], - current_flow.last_pkt_seen, + 'first_seen({}) <= {} >= last_seen({})'.format(json_dict['flow_first_seen'], + current_flow.ts_msec, json_dict['flow_last_seen'])) except AttributeError: if json_dict['flow_event_name'] == 'new': @@ -173,10 +258,10 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data): json_dict['flow_event_name'] == 'idle': current_flow.flow_ended = True elif json_dict['flow_event_name'] == 'new': - if stats.lowest_flow_id_for_new_flow > current_flow.flow_id: + if lowest_possible_flow_id > current_flow.flow_id: raise SemanticValidationException(current_flow, 'JSON dictionary lowest flow id for new flow > current flow id: ' \ - '{} != {}'.format(stats.lowest_flow_id_for_new_flow, current_flow.flow_id)) + '{} != {}'.format(lowest_possible_flow_id, current_flow.flow_id)) try: if current_flow.flow_new_seen == True: raise SemanticValidationException(current_flow, @@ -185,8 +270,8 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data): pass current_flow.flow_new_seen = True current_flow.flow_packet_id = 0 - if stats.lowest_flow_id_for_new_flow == 0: - stats.lowest_flow_id_for_new_flow = current_flow.flow_id + if lowest_possible_flow_id == 0: + td.lowest_possible_flow_id = current_flow.flow_id elif json_dict['flow_event_name'] == 'detected' or \ json_dict['flow_event_name'] == 'not-detected': try: @@ -198,19 +283,19 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data): current_flow.flow_detection_finished = True try: - if current_flow.flow_new_seen is True and stats.lowest_flow_id_for_new_flow > current_flow.flow_id: + if current_flow.flow_new_seen is True and lowest_possible_flow_id > current_flow.flow_id: raise SemanticValidationException(current_flow, 'Lowest flow id for flow > current flow id: ' \ - '{} > {}'.format(stats.lowest_flow_id_for_new_flow, current_flow.flow_id)) + '{} > {}'.format(lowest_possible_flow_id, current_flow.flow_id)) except AttributeError: pass - global_user_data.lines_processed += 1 - if global_user_data.lines_processed % global_user_data.print_dot_every == 0: + stats.lines_processed += 1 + if stats.lines_processed % stats.print_dot_every == 0: sys.stdout.write('.') sys.stdout.flush() - print_nmb_every = global_user_data.print_nmb_every + (len(str(global_user_data.lines_processed)) * global_user_data.print_dot_every) - if global_user_data.lines_processed % print_nmb_every == 0: - sys.stdout.write(str(global_user_data.lines_processed)) + print_nmb_every = stats.print_nmb_every + (len(str(stats.lines_processed)) * stats.print_dot_every) + if stats.lines_processed % print_nmb_every == 0: + sys.stdout.write(str(stats.lines_processed)) sys.stdout.flush() return True @@ -218,6 +303,8 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data): if __name__ == '__main__': argparser = nDPIsrvd.defaultArgumentParser() argparser.add_argument('--strict', action='store_true', default=False, help='Require and validate a full nDPId application lifecycle.') + argparser.add_argument('--enable-timeout-check', action='store_true', default=False, + help='Enable additional flow timeout validation. See README.md for more information') args = argparser.parse_args() address = nDPIsrvd.validateAddress(args) @@ -226,9 +313,9 @@ if __name__ == '__main__': nsock = nDPIsrvdSocket() nsock.connect(address) - stats = Stats() + stats = Stats(nsock) try: - nsock.loop(onJsonLineRecvd, stats) + nsock.loop(onJsonLineRecvd, onFlowCleanup, (args.strict, args.enable_timeout_check, stats)) except nDPIsrvd.SocketConnectionBroken as err: sys.stderr.write('\n{}\n'.format(err)) except KeyboardInterrupt: |