diff options
-rw-r--r-- | README.md | 8 | ||||
-rwxr-xr-x | examples/py-flow-info/flow-info.py | 64 |
2 files changed, 56 insertions, 16 deletions
@@ -11,7 +11,7 @@ This value serves as unique identifier for the processing thread. Multithreaded nDPId uses libnDPI's JSON serialization to produce meaningful JSON output which it then sends to the nDPIsrvd for distribution. High level applications can connect to nDPIsrvd to get the latest flow/packet events from nDPId. -TODO: Provide some sort of authentication for connecting distributor clients (somehow very critical). +TODO: Provide some sort of AEAD for connecting distributor clients via TCP (somehow very critical). # architecture @@ -24,7 +24,7 @@ _______________________ ________________ | | | nDPIsrvd | | | | nDPId --- Thread 1 >| ---> |> | <| <--- |< example/c-json-stdout | | `- Thread 2 >| ---> |> collector | distributor <| <--- |< example/py-flow-info | -| `- Thread N >| ---> |> | <| <--- | | +| `- Thread N >| ---> |> | <| <--- | ... | |_____________________| ^ |____________|______________| ^ |________________________| | | `- connect to UNIX socket `- connect to TCP socket @@ -64,13 +64,13 @@ make all examples # run -Daemon mode: +Daemons: ```shell ./nDPIsrvd -d ./nDPId -d ``` -And why not a simple flow-info example: +And why not a flow-info example? ```shell ./examples/py-flow-info/flow-info.py ``` diff --git a/examples/py-flow-info/flow-info.py b/examples/py-flow-info/flow-info.py index 41f695e02..3dd41c87d 100755 --- a/examples/py-flow-info/flow-info.py +++ b/examples/py-flow-info/flow-info.py @@ -20,7 +20,12 @@ class nDPIsrvdSocket: self.digitlen = 0 def receive(self): - recvd = self.sock.recv(NETWORK_BUFFER_MAX_SIZE - len(self.buffer)).decode(errors='strict') + try: + recvd_buf = self.sock.recv(NETWORK_BUFFER_MAX_SIZE - len(self.buffer)) + recvd = recvd_buf.decode(errors='strict') + except UnicodeDecodeError as exc: + raise RuntimeError('Unicode Exception: {}\n\nReceived String: {}'.format(str(exc), str(recvd_buf))) + if recvd == '': raise RuntimeError('socket connection broken') self.buffer += recvd @@ -38,15 +43,26 @@ class nDPIsrvdSocket: if len(self.buffer) >= self.msglen + self.digitlen: recvd = self.buffer[self.digitlen:self.msglen + self.digitlen] self.buffer = self.buffer[self.msglen + self.digitlen:] + retval += [(recvd,self.msglen,self.digitlen)] + self.msglen = 0 self.digitlen = 0 - retval += [recvd] return retval +class TermColor: + WARNING = '\033[93m' + FAIL = '\033[91m' + BOLD = '\033[1m' + END = '\033[0m' + BLINK = "\x1b[5m" + def parse_json_str(json_str): - j = json.loads(json_str) + try: + j = json.loads(json_str[0]) + except json.decoder.JSONDecodeError as exc: + raise RuntimeError('JSON Exception: {}\n\nJSON String: {}\n'.format(str(exc), str(json_str))) if 'flow_event_name' in j: event = j['flow_event_name'].lower() @@ -65,23 +81,48 @@ def parse_json_str(json_str): else: raise RuntimeError('unknown flow event name: {}'.format(event)) + ndpi_proto_categ = '' + ndpi_frisk = '' + + if 'ndpi' in j: + if 'proto' in j['ndpi']: + ndpi_proto_categ += '[' + str(j['ndpi']['proto']) + ']' + + if 'category' in j['ndpi']: + ndpi_proto_categ += '[' + str(j['ndpi']['category']) + ']' + + if 'flow_risk' in j['ndpi']: + cnt = 0 + for key in j['ndpi']['flow_risk']: + ndpi_frisk += str(j['ndpi']['flow_risk'][key]) + ', ' + cnt += 1 + ndpi_frisk = '{}: {}'.format( + TermColor.WARNING + TermColor.BOLD + 'RISK' + TermColor.END if cnt < 2 + else TermColor.FAIL + TermColor.BOLD + TermColor.BLINK + 'RISK' + TermColor.END, + ndpi_frisk[:-2]) + if j['l3_proto'] == 'ip4': - print('{:>14}: [{:>8}] [{}][{:>5}] [{:>15}][{:>5}] -> [{:>15}][{:>5}]'.format(event_str, + print('{:>14}: [{:.>8}] [{}][{:.>5}] [{:.>15}]{} -> [{:.>15}]{} {}'.format(event_str, j['flow_id'], j['l3_proto'], j['l4_proto'], j['src_ip'].lower(), - j['src_port'] if 'src_port' in j else '', + '[{:.>5}]'.format(j['src_port']) if 'src_port' in j else '', j['dst_ip'].lower(), - j['dst_port'] if 'dst_port' in j else '')) + '[{:.>5}]'.format(j['dst_port']) if 'dst_port' in j else '', + ndpi_proto_categ)) elif j['l3_proto'] == 'ip6': - print('{:>14}: [{:>8}] [{}][{:>5}] [{:>39}][{:>5}] -> [{:>39}][{:>5}]'.format(event_str, + print('{:>14}: [{:.>8}] [{}][{:.>5}] [{:.>39}]{} -> [{:.>39}]{} {}'.format(event_str, j['flow_id'], j['l3_proto'], j['l4_proto'], j['src_ip'].lower(), - j['src_port'] if 'src_port' in j else '', + '[{:.>5}]'.format(j['src_port']) if 'src_port' in j else '', j['dst_ip'].lower(), - j['dst_port'] if 'dst_port' in j else '')) + '[{:.>5}]'.format(j['dst_port']) if 'dst_port' in j else '', + ndpi_proto_categ)) else: raise RuntimeError('unsupported l3 protocol: {}'.format(j['l3_proto'])) + if len(ndpi_frisk) > 0: + print('{:>16}{}'.format('', ndpi_frisk)) + if __name__ == '__main__': host = HOST @@ -102,7 +143,6 @@ if __name__ == '__main__': while True: received = nsock.receive() - for line in received: - #print(line) - parse_json_str(line) + for received_json_pkt in received: + parse_json_str(received_json_pkt) |