diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2021-12-15 23:25:32 +0100 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2022-01-20 00:50:38 +0100 |
commit | 9e07a57566cc45bf92a845d8cee968d72e0f314e (patch) | |
tree | 8f1a6bfd08bd68a5253fadf3a01beecda77b1c95 /examples/py-flow-info/flow-info.py | |
parent | a35fc1d5ea8570609cc0c8cf6edadc81f8f5bb76 (diff) |
Major nDPId extension. Sorry for the huge commit.
- nDPId: fixed invalid IP4/IP6 tuple compare
- nDPIsrvd: fixed caching issue (finally)
- added tiny c example (can be used to check flow manager sanity)
- c-captured: use flow_last_seen timestamp from `struct nDPIsrvd_flow`
- README.md update: added example JSON sequence
- nDPId: added new flow event `update` necessary for correct
timeout handling (and other future use-cases)
- nDPIsrvd.h and nDPIsrvd.py: switched to an instance
(consists of an alias/source tuple) based flow manager
- every flow related event **must** now serialize `alias`, `source`,
`flow_id`, `flow_last_seen` and `flow_idle_time` to make the timeout
handling and verification process work correctly
- nDPIsrvd.h: ability to profile any dynamic memory (de-)allocation
- nDPIsrvd.py: removed PcapPacket class (unused)
- py-flow-dashboard and py-flow-multiprocess: fixed race condition
- py-flow-info: print statusbar with probably useful information
- nDPId/nDPIsrvd.h: switched from packet-flow only timestamps (`pkt_*sec`)
to a generic flow event timestamp `ts_msec`
- nDPId-test: added additional checks
- nDPId: increased ICMP flow timeout
- nDPId: using event based i/o if capturing packets from a device
- nDPIsrvd: fixed memory leak on shutdown if remote descriptors
were still connected
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'examples/py-flow-info/flow-info.py')
-rwxr-xr-x | examples/py-flow-info/flow-info.py | 187 |
1 files changed, 181 insertions, 6 deletions
diff --git a/examples/py-flow-info/flow-info.py b/examples/py-flow-info/flow-info.py index 9dfa40957..1f25cea55 100755 --- a/examples/py-flow-info/flow-info.py +++ b/examples/py-flow-info/flow-info.py @@ -1,7 +1,9 @@ #!/usr/bin/env python3 import os +import math import sys +import time sys.path.append(os.path.dirname(sys.argv[0]) + '/../share/nDPId') sys.path.append(os.path.dirname(sys.argv[0]) + '/../usr/share/nDPId') @@ -16,6 +18,153 @@ except ImportError: global args global whois_db +def set_attr_from_dict(some_object, some_dict, key_and_attr_name, default_value): + try: + setattr(some_object, key_and_attr_name, some_dict[key_and_attr_name]) + except KeyError: + if default_value is not None and getattr(some_object, key_and_attr_name, None) is None: + setattr(some_object, key_and_attr_name, default_value) + +def set_attr_if_not_set(some_object, attr_name, value): + try: + getattr(some_object, attr_name) + except AttributeError: + setattr(some_object, attr_name, value) + +class Stats: + last_status_length = 0 + avg_xfer_json_bytes = 0.0 + expired_tot_l4_payload_len = 0 + expired_avg_l4_payload_len = 0 + total_flows = 0 + risky_flows = 0 + midstream_flows = 0 + guessed_flows = 0 + not_detected_flows = 0 + start_time = 0.0 + current_time = 0.0 + json_lines = 0 + spinner_state = 0 + + def __init__(self, nDPIsrvd_sock): + self.start_time = time.time() + self.nsock = nDPIsrvd_sock + + def updateSpinner(self): + if self.current_time + 0.25 <= time.time(): + self.spinner_state += 1 + + def getSpinner(self): + spinner_states = ['-', '\\', '|', '/'] + return spinner_states[self.spinner_state % len(spinner_states)] + + def getDataFromJson(self, json_dict, current_flow): + if current_flow is None: + return + + set_attr_from_dict(current_flow, json_dict, 'flow_tot_l4_payload_len', 0) + set_attr_from_dict(current_flow, json_dict, 'flow_avg_l4_payload_len', 0) + if 'ndpi' in json_dict: + set_attr_from_dict(current_flow, json_dict['ndpi'], 'flow_risk', {}) + else: + set_attr_from_dict(current_flow, {}, 'flow_risk', {}) + set_attr_from_dict(current_flow, json_dict, 'midstream', 0) + set_attr_from_dict(current_flow, json_dict, 'flow_event_name', '') + set_attr_if_not_set(current_flow, 'guessed', False) + set_attr_if_not_set(current_flow, 'not_detected', False) + if current_flow.flow_event_name == 'guessed': + current_flow.guessed = True + elif current_flow.flow_event_name == 'not-detected': + current_flow.not_detected = True + + def update(self, json_dict, current_flow): + self.updateSpinner() + self.json_lines += 1 + self.current_time = time.time() + self.avg_xfer_json_bytes = self.nsock.received_bytes / (self.current_time - self.start_time) + self.getDataFromJson(json_dict, current_flow) + + def updateOnCleanup(self, current_flow): + self.total_flows += 1 + self.expired_tot_l4_payload_len += current_flow.flow_tot_l4_payload_len + self.expired_avg_l4_payload_len += current_flow.flow_avg_l4_payload_len + self.risky_flows += 1 if len(current_flow.flow_risk) > 0 else 0 + self.midstream_flows += 1 if current_flow.midstream != 0 else 0 + self.guessed_flows += 1 if current_flow.guessed is True else 0 + self.not_detected_flows += 1 if current_flow.not_detected is True else 0 + + def getStatsFromFlowMgr(self): + alias_count = 0 + source_count = 0 + flow_count = 0 + flow_tot_l4_payload_len = 0.0 + flow_avg_l4_payload_len = 0.0 + risky = 0 + midstream = 0 + guessed = 0 + not_detected = 0 + + instances = self.nsock.flow_mgr.instances + for alias in instances: + alias_count += 1 + for source in instances[alias]: + source_count += 1 + for flow_id in instances[alias][source].flows: + flow_count += 1 + current_flow = instances[alias][source].flows[flow_id] + + flow_tot_l4_payload_len += current_flow.flow_tot_l4_payload_len + flow_avg_l4_payload_len += current_flow.flow_avg_l4_payload_len + risky += 1 if len(current_flow.flow_risk) > 0 else 0 + midstream += 1 if current_flow.midstream != 0 else 0 + guessed += 1 if current_flow.guessed is True else 0 + not_detected = 1 if current_flow.not_detected is True else 0 + + return alias_count, source_count, flow_count, \ + flow_tot_l4_payload_len, flow_avg_l4_payload_len, \ + risky, midstream, guessed, not_detected + + @staticmethod + def prettifyBytes(bytes_received): + size_names = ['B', 'KB', 'MB', 'GB', 'TB'] + if bytes_received == 0: + i = 0 + else: + i = min(int(math.floor(math.log(bytes_received, 1024))), len(size_names) - 1) + p = math.pow(1024, i) + s = round(bytes_received / p, 2) + return '{:.2f} {}'.format(s, size_names[i]) + + def resetStatus(self): + sys.stdout.write('\r' + str(' ' * self.last_status_length) + '\r') + sys.stdout.flush() + + def printStatus(self): + alias_count, source_count, flow_count, \ + tot_l4_payload_len, avg_l4_payload_len, \ + risky, midstream, guessed, not_detected = self.getStatsFromFlowMgr() + + out_str = '\r[n|tot|avg JSONs: {}|{}|{}/s] [tot|avg l4: {}|{}] ' \ + '[lss|srcs: {}|{}] ' \ + '[flws|rsky|mdstrm|!dtctd|gssd: {}|{}|{}|{}|{} / {}|{}|{}|{}|{}] [{}]' \ + ''.format(self.json_lines, + Stats.prettifyBytes(self.nsock.received_bytes), + Stats.prettifyBytes(self.avg_xfer_json_bytes), + Stats.prettifyBytes(tot_l4_payload_len + self.expired_tot_l4_payload_len), + Stats.prettifyBytes(avg_l4_payload_len + self.expired_avg_l4_payload_len), + alias_count, source_count, + flow_count, risky, midstream, not_detected, guessed, + flow_count + self.total_flows, + risky + self.risky_flows, + midstream + self.midstream_flows, + not_detected + self.not_detected_flows, + guessed + self.guessed_flows, + self.getSpinner()) + self.last_status_length = len(out_str) - 1 # '\r' + + sys.stdout.write(out_str) + sys.stdout.flush() + def prettifyEvent(color_list, whitespaces, text): term_attrs = str() for color in color_list: @@ -60,18 +209,35 @@ def whois(ip_str): return None return whois_db[ip_str] -def onJsonLineRecvd(json_dict, current_flow, global_user_data): +def onFlowCleanup(instance, current_flow, global_user_data): + stats = global_user_data + stats.updateOnCleanup(current_flow) + + return True + +def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): + stats = global_user_data + stats.update(json_dict, current_flow) + stats.resetStatus() + instance_and_source = '' - instance_and_source += '[{}]'.format(TermColor.setColorByString(json_dict['alias'])) - instance_and_source += '[{}]'.format(TermColor.setColorByString(json_dict['source'])) + instance_and_source += '[{}]'.format(TermColor.setColorByString(instance.alias)) + instance_and_source += '[{}]'.format(TermColor.setColorByString(instance.source)) + if 'daemon_event_id' in json_dict: + print('{} {}: {}'.format(instance_and_source, prettifyEvent([TermColor.WARNING, TermColor.BLINK], 16, 'DAEMON-EVENT'), json_dict['daemon_event_name'])) + stats.printStatus() + return True if 'basic_event_id' in json_dict: - print('{} {}: {}'.format(instance_and_source, prettifyEvent([TermColor.WARNING, TermColor.BLINK], 16, 'BASIC-EVENT'), json_dict['basic_event_name'])) + print('{} {}: {}'.format(instance_and_source, prettifyEvent([TermColor.FAIL, TermColor.BLINK], 16, 'BASIC-EVENT'), json_dict['basic_event_name'])) + stats.printStatus() return True elif 'flow_event_id' not in json_dict: + stats.printStatus() return True if checkEventFilter(json_dict) is False: + stats.printStatus() return True ndpi_proto_categ_breed = '' @@ -99,8 +265,11 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data): line_suffix = '' flow_event_name = '' - if json_dict['flow_event_name'] == 'guessed' or json_dict['flow_event_name'] == 'not-detected': + if json_dict['flow_event_name'] == 'guessed': flow_event_name += '{}{:>16}{}'.format(TermColor.HINT, json_dict['flow_event_name'], TermColor.END) + elif json_dict['flow_event_name'] == 'not-detected': + flow_event_name += '{}{:>16}{}'.format(TermColor.WARNING + TermColor.BOLD + TermColor.BLINK, + json_dict['flow_event_name'], TermColor.END) else: if json_dict['flow_event_name'] == 'new': line_suffix = '' @@ -145,6 +314,8 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data): if len(ndpi_frisk) > 0: print('{} {:>18}{}'.format(instance_and_source, '', ndpi_frisk)) + stats.printStatus() + return True if __name__ == '__main__': @@ -169,4 +340,8 @@ if __name__ == '__main__': nsock = nDPIsrvdSocket() nsock.connect(address) - nsock.loop(onJsonLineRecvd, None) + stats = Stats(nsock) + try: + nsock.loop(onJsonLineRecvd, onFlowCleanup, stats) + except KeyboardInterrupt: + print('\n\nKeyboard Interrupt: cleaned up {} flows.'.format(len(nsock.shutdown()))) |