diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2020-09-30 20:35:28 +0200 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2020-09-30 20:35:28 +0200 |
commit | 9ea078b05e1191c6879cc9bd63bdb44cc5c3aec9 (patch) | |
tree | 57c28da238bc6e7b88b818b33611d3153264436e /contrib | |
parent | 2f7a052f436454fde1f951e7210aca64b24e1548 (diff) |
Improved python event validation with focus on readability.
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'contrib')
-rw-r--r-- | contrib/nDPIsrvd.py | 130 |
1 files changed, 62 insertions, 68 deletions
diff --git a/contrib/nDPIsrvd.py b/contrib/nDPIsrvd.py index faba82a1d..7586e4a0e 100644 --- a/contrib/nDPIsrvd.py +++ b/contrib/nDPIsrvd.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import argparse +import array import json import re import os @@ -19,7 +20,9 @@ PKT_TYPE_ETH_IP4 = 0x0800 PKT_TYPE_ETH_IP6 = 0x86DD EVENT_UNKNOWN = 'Unknown' -DAEMON_EVENTS = ['Invalid', 'Init', 'Reconnect', 'Shutdown'] +# Event tuple: (pretty-name, real-name) +DAEMON_EVENTS = [ ('Invalid','invalid'), ('Init','init'), \ + ('Reconnect','reconnect'), ('Shutdown','shutdown') ] BASIC_EVENTS = ['Invalid', 'Unknown-Datalink-Layer', 'Unknown-Layer3-Protocol', 'Non-IP-Packet', 'Ethernet-Packet-Too-Short', 'Ethernet-Packet-Unknown', 'IP4-Packet-Too-Short', 'IP4-Size-Smaller-Than-Header', 'IP4-Layer4-Payload-Detection-Failed', 'IP6-Packet-Too-Short', @@ -27,8 +30,9 @@ BASIC_EVENTS = ['Invalid', 'Unknown-Datalink-Layer', 'Unknown-Layer3-Protocol', 'UDP-Packet-Too-Short', 'Capture-Size-Smaller-Than-Packet-Size', 'Max-Flow-To-Track', 'Flow-Memory-Allocation-Failed', 'NDPI-Flow-Memory-Allocation-Failed', 'NDPI-ID-Memory-Allocation-Failed'] -PACKET_EVENTS = ['Invalid', 'Packet', 'Packet-Flow'] -FLOW_EVENTS = ['Invalid', 'New', 'End', 'Idle', 'Guessed', 'Detected', 'Detection-Update', 'Not-Detected'] +PACKET_EVENTS = [ ('Invalid','invalid'), ('Packet','packet'), ('Packet-Flow','packet-flow') ] +FLOW_EVENTS = [ ('Invalid','invalid'), ('New','new'), ('End','end'), ('Idle','idle'), ('Guessed','guessed'), \ + ('Detected','detected'), ('Detection-Update','detection-update'), ('Not-Detected','not-detected') ] class TermColor: WARNING = '\033[93m' @@ -157,82 +161,72 @@ def JsonParseBytes(json_bytes): class nDPIdEvent: isValid = False DaemonEventID = -1 - DaemonEventName = EVENT_UNKNOWN - BasicEventID = -1 - BasicEventName = EVENT_UNKNOWN + DaemonEventName = None + DaemonEventPrettyName = EVENT_UNKNOWN + BasicEventID = -1 + BasicEventName = None + BasicEventPrettyName = EVENT_UNKNOWN PacketEventID = -1 - PacketEventName = EVENT_UNKNOWN - FlowEventID = -1 - FlowEventName = EVENT_UNKNOWN + PacketEventName = None + PacketEventPrettyName = EVENT_UNKNOWN + FlowEventID = -1 + FlowEventName = None + FlowEventPrettyName = EVENT_UNKNOWN -def validateFlowEventID(event_id): - if type(event_id) is not int: - raise RuntimeError('Argument is not an Integer/EventID!') + def validateEvent(self, event_id, event_name, list_of_event_tuples): + if self.isValid is True: + raise RuntimeError('nDPId event already validated. Multiple Events in one JSON strings are not allowed.') - if event_id < 0 or event_id > len(FLOW_EVENTS): - raise RuntimeError('Unknown flow event id: {}.'.format(event_id)) - else: - event_str = FLOW_EVENTS[event_id] - - return event_str + if type(event_id) is not int: + raise RuntimeError('Argument is not an Integer/EventID!') -def validatePacketEventID(event_id): - if type(event_id) is not int: - raise RuntimeError('Argument is not an Integer/EventID!') + if event_id < 0 or event_id >= len(list_of_event_tuples): + raise RuntimeError('Unknown event id: {} aka {}.'.format(event_id, event_name)) - if event_id < 0 or event_id > len(PACKET_EVENTS): - raise RuntimeError('Unknown packet event id: {}.'.format(event_id)) - else: - event_str = PACKET_EVENTS[event_id] + if type(list_of_event_tuples[0]) == tuple and list_of_event_tuples[event_id][1] != event_name: + raise RuntimeError('Unknown event name: {}.'.format(event_name)) - return event_str + self.isValid = True + return list_of_event_tuples[event_id][0] if type(list_of_event_tuples[0]) == tuple \ + else list_of_event_tuples[event_id] -def validateBasicEventID(event_id): - if type(event_id) is not int: - raise RuntimeError('Argument is not an Integer/EventID!') + def validateFlowEvent(self): + return self.validateEvent(self.FlowEventID, self.FlowEventName, FLOW_EVENTS) - if event_id < 0 or event_id > len(BASIC_EVENTS): - raise RuntimeError('Unknown basic event id: {}.'.format(event_id)) - else: - event_str = BASIC_EVENTS[event_id] + def validatePacketEvent(self): + return self.validateEvent(self.PacketEventID, self.PacketEventName, PACKET_EVENTS) - return event_str + def validateBasicEvent(self): + return self.validateEvent(self.BasicEventID, self.BasicEventName, BASIC_EVENTS) -def validateDaemonEventID(event_id): - if type(event_id) is not int: - raise RuntimeError('Argument is not an Integer/EventID!') + def validateDaemonEvent(self): + return self.validateEvent(self.DaemonEventID, self.DaemonEventName, DAEMON_EVENTS) - if event_id < 0 or event_id > len(DAEMON_EVENTS): - raise RuntimeError('Unknown daemon event id: {}.'.format(event_id)) - else: - event_str = DAEMON_EVENTS[event_id] - - return event_str - -def validateJsonEventTypes(json_dict): - if type(json_dict) is not dict: - raise RuntimeError('Argument is not a dictionary!') - - nev = nDPIdEvent() - - if 'daemon_event_id' in json_dict: - nev.DaemonEventID = json_dict['daemon_event_id'] - nev.DaemonEventName = validateDaemonEventID(nev.DaemonEventID) - nev.isValid = True - if 'basic_event_id' in json_dict: - nev.BasicEventID = json_dict['basic_event_id'] - nev.BasicEventName = validateBasicEventID(nev.BasicEventID) - nev.isValid = True - if 'packet_event_id' in json_dict: - nev.PacketEventID = json_dict['packet_event_id'] - nev.PacketEventName = validatePacketEventID(nev.PacketEventID) - nev.isValid = True - if 'flow_event_id' in json_dict: - nev.FlowEventID = json_dict['flow_event_id'] - nev.FlowEventName = validateFlowEventID(nev.FlowEventID) - nev.isValid = True - - return nev + @staticmethod + def validateJsonEventTypes(json_dict): + if type(json_dict) is not dict: + raise RuntimeError('Argument is not a dictionary!') + + nev = nDPIdEvent() + + if 'daemon_event_id' in json_dict: + nev.DaemonEventID = json_dict['daemon_event_id'] + nev.DaemonEventName = json_dict['daemon_event_name'] + nev.DaemonEventPrettyName = nev.validateDaemonEvent() + if 'basic_event_id' in json_dict: + nev.BasicEventID = json_dict['basic_event_id'] + nev.BasicEventName = json_dict['basic_event_name'] + nev.BasicEventPrettyName = nev.validateBasicEvent() + if 'packet_event_id' in json_dict: + nev.PacketEventID = json_dict['packet_event_id'] + nev.PacketEventName = json_dict['packet_event_name'] + nev.PacketEventPrettyName = nev.validatePacketEvent() + if 'flow_event_id' in json_dict: + nev.FlowEventID = json_dict['flow_event_id'] + nev.FlowEventName = json_dict['flow_event_name'] + nev.FlowEventPrettyName = nev.validateFlowEvent() + + return nev def defaultArgumentParser(): parser = argparse.ArgumentParser(description='nDPIsrvd options', formatter_class=argparse.ArgumentDefaultsHelpFormatter) |