summaryrefslogtreecommitdiff
path: root/examples/py-flow-info/flow-info.py
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2021-12-15 23:25:32 +0100
committerToni Uhlig <matzeton@googlemail.com>2022-01-20 00:50:38 +0100
commit9e07a57566cc45bf92a845d8cee968d72e0f314e (patch)
tree8f1a6bfd08bd68a5253fadf3a01beecda77b1c95 /examples/py-flow-info/flow-info.py
parenta35fc1d5ea8570609cc0c8cf6edadc81f8f5bb76 (diff)
Major nDPId extension. Sorry for the huge commit.
- nDPId: fixed invalid IP4/IP6 tuple compare - nDPIsrvd: fixed caching issue (finally) - added tiny c example (can be used to check flow manager sanity) - c-captured: use flow_last_seen timestamp from `struct nDPIsrvd_flow` - README.md update: added example JSON sequence - nDPId: added new flow event `update` necessary for correct timeout handling (and other future use-cases) - nDPIsrvd.h and nDPIsrvd.py: switched to an instance (consists of an alias/source tuple) based flow manager - every flow related event **must** now serialize `alias`, `source`, `flow_id`, `flow_last_seen` and `flow_idle_time` to make the timeout handling and verification process work correctly - nDPIsrvd.h: ability to profile any dynamic memory (de-)allocation - nDPIsrvd.py: removed PcapPacket class (unused) - py-flow-dashboard and py-flow-multiprocess: fixed race condition - py-flow-info: print statusbar with probably useful information - nDPId/nDPIsrvd.h: switched from packet-flow only timestamps (`pkt_*sec`) to a generic flow event timestamp `ts_msec` - nDPId-test: added additional checks - nDPId: increased ICMP flow timeout - nDPId: using event based i/o if capturing packets from a device - nDPIsrvd: fixed memory leak on shutdown if remote descriptors were still connected Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'examples/py-flow-info/flow-info.py')
-rwxr-xr-xexamples/py-flow-info/flow-info.py187
1 files changed, 181 insertions, 6 deletions
diff --git a/examples/py-flow-info/flow-info.py b/examples/py-flow-info/flow-info.py
index 9dfa40957..1f25cea55 100755
--- a/examples/py-flow-info/flow-info.py
+++ b/examples/py-flow-info/flow-info.py
@@ -1,7 +1,9 @@
#!/usr/bin/env python3
import os
+import math
import sys
+import time
sys.path.append(os.path.dirname(sys.argv[0]) + '/../share/nDPId')
sys.path.append(os.path.dirname(sys.argv[0]) + '/../usr/share/nDPId')
@@ -16,6 +18,153 @@ except ImportError:
global args
global whois_db
+def set_attr_from_dict(some_object, some_dict, key_and_attr_name, default_value):
+ try:
+ setattr(some_object, key_and_attr_name, some_dict[key_and_attr_name])
+ except KeyError:
+ if default_value is not None and getattr(some_object, key_and_attr_name, None) is None:
+ setattr(some_object, key_and_attr_name, default_value)
+
+def set_attr_if_not_set(some_object, attr_name, value):
+ try:
+ getattr(some_object, attr_name)
+ except AttributeError:
+ setattr(some_object, attr_name, value)
+
+class Stats:
+ last_status_length = 0
+ avg_xfer_json_bytes = 0.0
+ expired_tot_l4_payload_len = 0
+ expired_avg_l4_payload_len = 0
+ total_flows = 0
+ risky_flows = 0
+ midstream_flows = 0
+ guessed_flows = 0
+ not_detected_flows = 0
+ start_time = 0.0
+ current_time = 0.0
+ json_lines = 0
+ spinner_state = 0
+
+ def __init__(self, nDPIsrvd_sock):
+ self.start_time = time.time()
+ self.nsock = nDPIsrvd_sock
+
+ def updateSpinner(self):
+ if self.current_time + 0.25 <= time.time():
+ self.spinner_state += 1
+
+ def getSpinner(self):
+ spinner_states = ['-', '\\', '|', '/']
+ return spinner_states[self.spinner_state % len(spinner_states)]
+
+ def getDataFromJson(self, json_dict, current_flow):
+ if current_flow is None:
+ return
+
+ set_attr_from_dict(current_flow, json_dict, 'flow_tot_l4_payload_len', 0)
+ set_attr_from_dict(current_flow, json_dict, 'flow_avg_l4_payload_len', 0)
+ if 'ndpi' in json_dict:
+ set_attr_from_dict(current_flow, json_dict['ndpi'], 'flow_risk', {})
+ else:
+ set_attr_from_dict(current_flow, {}, 'flow_risk', {})
+ set_attr_from_dict(current_flow, json_dict, 'midstream', 0)
+ set_attr_from_dict(current_flow, json_dict, 'flow_event_name', '')
+ set_attr_if_not_set(current_flow, 'guessed', False)
+ set_attr_if_not_set(current_flow, 'not_detected', False)
+ if current_flow.flow_event_name == 'guessed':
+ current_flow.guessed = True
+ elif current_flow.flow_event_name == 'not-detected':
+ current_flow.not_detected = True
+
+ def update(self, json_dict, current_flow):
+ self.updateSpinner()
+ self.json_lines += 1
+ self.current_time = time.time()
+ self.avg_xfer_json_bytes = self.nsock.received_bytes / (self.current_time - self.start_time)
+ self.getDataFromJson(json_dict, current_flow)
+
+ def updateOnCleanup(self, current_flow):
+ self.total_flows += 1
+ self.expired_tot_l4_payload_len += current_flow.flow_tot_l4_payload_len
+ self.expired_avg_l4_payload_len += current_flow.flow_avg_l4_payload_len
+ self.risky_flows += 1 if len(current_flow.flow_risk) > 0 else 0
+ self.midstream_flows += 1 if current_flow.midstream != 0 else 0
+ self.guessed_flows += 1 if current_flow.guessed is True else 0
+ self.not_detected_flows += 1 if current_flow.not_detected is True else 0
+
+ def getStatsFromFlowMgr(self):
+ alias_count = 0
+ source_count = 0
+ flow_count = 0
+ flow_tot_l4_payload_len = 0.0
+ flow_avg_l4_payload_len = 0.0
+ risky = 0
+ midstream = 0
+ guessed = 0
+ not_detected = 0
+
+ instances = self.nsock.flow_mgr.instances
+ for alias in instances:
+ alias_count += 1
+ for source in instances[alias]:
+ source_count += 1
+ for flow_id in instances[alias][source].flows:
+ flow_count += 1
+ current_flow = instances[alias][source].flows[flow_id]
+
+ flow_tot_l4_payload_len += current_flow.flow_tot_l4_payload_len
+ flow_avg_l4_payload_len += current_flow.flow_avg_l4_payload_len
+ risky += 1 if len(current_flow.flow_risk) > 0 else 0
+ midstream += 1 if current_flow.midstream != 0 else 0
+ guessed += 1 if current_flow.guessed is True else 0
+ not_detected = 1 if current_flow.not_detected is True else 0
+
+ return alias_count, source_count, flow_count, \
+ flow_tot_l4_payload_len, flow_avg_l4_payload_len, \
+ risky, midstream, guessed, not_detected
+
+ @staticmethod
+ def prettifyBytes(bytes_received):
+ size_names = ['B', 'KB', 'MB', 'GB', 'TB']
+ if bytes_received == 0:
+ i = 0
+ else:
+ i = min(int(math.floor(math.log(bytes_received, 1024))), len(size_names) - 1)
+ p = math.pow(1024, i)
+ s = round(bytes_received / p, 2)
+ return '{:.2f} {}'.format(s, size_names[i])
+
+ def resetStatus(self):
+ sys.stdout.write('\r' + str(' ' * self.last_status_length) + '\r')
+ sys.stdout.flush()
+
+ def printStatus(self):
+ alias_count, source_count, flow_count, \
+ tot_l4_payload_len, avg_l4_payload_len, \
+ risky, midstream, guessed, not_detected = self.getStatsFromFlowMgr()
+
+ out_str = '\r[n|tot|avg JSONs: {}|{}|{}/s] [tot|avg l4: {}|{}] ' \
+ '[lss|srcs: {}|{}] ' \
+ '[flws|rsky|mdstrm|!dtctd|gssd: {}|{}|{}|{}|{} / {}|{}|{}|{}|{}] [{}]' \
+ ''.format(self.json_lines,
+ Stats.prettifyBytes(self.nsock.received_bytes),
+ Stats.prettifyBytes(self.avg_xfer_json_bytes),
+ Stats.prettifyBytes(tot_l4_payload_len + self.expired_tot_l4_payload_len),
+ Stats.prettifyBytes(avg_l4_payload_len + self.expired_avg_l4_payload_len),
+ alias_count, source_count,
+ flow_count, risky, midstream, not_detected, guessed,
+ flow_count + self.total_flows,
+ risky + self.risky_flows,
+ midstream + self.midstream_flows,
+ not_detected + self.not_detected_flows,
+ guessed + self.guessed_flows,
+ self.getSpinner())
+ self.last_status_length = len(out_str) - 1 # '\r'
+
+ sys.stdout.write(out_str)
+ sys.stdout.flush()
+
def prettifyEvent(color_list, whitespaces, text):
term_attrs = str()
for color in color_list:
@@ -60,18 +209,35 @@ def whois(ip_str):
return None
return whois_db[ip_str]
-def onJsonLineRecvd(json_dict, current_flow, global_user_data):
+def onFlowCleanup(instance, current_flow, global_user_data):
+ stats = global_user_data
+ stats.updateOnCleanup(current_flow)
+
+ return True
+
+def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
+ stats = global_user_data
+ stats.update(json_dict, current_flow)
+ stats.resetStatus()
+
instance_and_source = ''
- instance_and_source += '[{}]'.format(TermColor.setColorByString(json_dict['alias']))
- instance_and_source += '[{}]'.format(TermColor.setColorByString(json_dict['source']))
+ instance_and_source += '[{}]'.format(TermColor.setColorByString(instance.alias))
+ instance_and_source += '[{}]'.format(TermColor.setColorByString(instance.source))
+ if 'daemon_event_id' in json_dict:
+ print('{} {}: {}'.format(instance_and_source, prettifyEvent([TermColor.WARNING, TermColor.BLINK], 16, 'DAEMON-EVENT'), json_dict['daemon_event_name']))
+ stats.printStatus()
+ return True
if 'basic_event_id' in json_dict:
- print('{} {}: {}'.format(instance_and_source, prettifyEvent([TermColor.WARNING, TermColor.BLINK], 16, 'BASIC-EVENT'), json_dict['basic_event_name']))
+ print('{} {}: {}'.format(instance_and_source, prettifyEvent([TermColor.FAIL, TermColor.BLINK], 16, 'BASIC-EVENT'), json_dict['basic_event_name']))
+ stats.printStatus()
return True
elif 'flow_event_id' not in json_dict:
+ stats.printStatus()
return True
if checkEventFilter(json_dict) is False:
+ stats.printStatus()
return True
ndpi_proto_categ_breed = ''
@@ -99,8 +265,11 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data):
line_suffix = ''
flow_event_name = ''
- if json_dict['flow_event_name'] == 'guessed' or json_dict['flow_event_name'] == 'not-detected':
+ if json_dict['flow_event_name'] == 'guessed':
flow_event_name += '{}{:>16}{}'.format(TermColor.HINT, json_dict['flow_event_name'], TermColor.END)
+ elif json_dict['flow_event_name'] == 'not-detected':
+ flow_event_name += '{}{:>16}{}'.format(TermColor.WARNING + TermColor.BOLD + TermColor.BLINK,
+ json_dict['flow_event_name'], TermColor.END)
else:
if json_dict['flow_event_name'] == 'new':
line_suffix = ''
@@ -145,6 +314,8 @@ def onJsonLineRecvd(json_dict, current_flow, global_user_data):
if len(ndpi_frisk) > 0:
print('{} {:>18}{}'.format(instance_and_source, '', ndpi_frisk))
+ stats.printStatus()
+
return True
if __name__ == '__main__':
@@ -169,4 +340,8 @@ if __name__ == '__main__':
nsock = nDPIsrvdSocket()
nsock.connect(address)
- nsock.loop(onJsonLineRecvd, None)
+ stats = Stats(nsock)
+ try:
+ nsock.loop(onJsonLineRecvd, onFlowCleanup, stats)
+ except KeyboardInterrupt:
+ print('\n\nKeyboard Interrupt: cleaned up {} flows.'.format(len(nsock.shutdown())))