summaryrefslogtreecommitdiff
path: root/dependencies/nDPIsrvd.py
diff options
context:
space:
mode:
Diffstat (limited to 'dependencies/nDPIsrvd.py')
-rw-r--r--dependencies/nDPIsrvd.py632
1 files changed, 632 insertions, 0 deletions
diff --git a/dependencies/nDPIsrvd.py b/dependencies/nDPIsrvd.py
new file mode 100644
index 000000000..5ffa17e71
--- /dev/null
+++ b/dependencies/nDPIsrvd.py
@@ -0,0 +1,632 @@
+#!/usr/bin/env python3
+
+import argparse
+import array
+import json
+import re
+import os
+import stat
+import socket
+import sys
+
+try:
+ from colorama import Back, Fore, Style
+ USE_COLORAMA=True
+except ImportError:
+ sys.stderr.write('Python module colorama not found, using fallback.\n')
+ USE_COLORAMA=False
+
+DEFAULT_HOST = '127.0.0.1'
+DEFAULT_PORT = 7000
+DEFAULT_UNIX = '/tmp/ndpid-distributor.sock'
+
+NETWORK_BUFFER_MIN_SIZE = 6 # NETWORK_BUFFER_LENGTH_DIGITS + 1
+NETWORK_BUFFER_MAX_SIZE = 33792 # Please keep this value in sync with the one in config.h
+nDPId_PACKETS_PLEN_MAX = 8192 # Please keep this value in sync with the one in config.h
+
+PKT_TYPE_ETH_IP4 = 0x0800
+PKT_TYPE_ETH_IP6 = 0x86DD
+
+
+class TermColor:
+ HINT = '\033[33m'
+ WARNING = '\033[93m'
+ FAIL = '\033[91m'
+ BOLD = '\033[1m'
+ END = '\033[0m'
+ BLINK = '\x1b[5m'
+
+ if USE_COLORAMA is True:
+ COLOR_TUPLES = [ (Fore.BLUE, [Back.RED, Back.MAGENTA, Back.WHITE]),
+ (Fore.CYAN, [Back.MAGENTA, Back.RED, Back.WHITE]),
+ (Fore.GREEN, [Back.YELLOW, Back.RED, Back.MAGENTA, Back.WHITE]),
+ (Fore.MAGENTA, [Back.CYAN, Back.BLUE, Back.WHITE]),
+ (Fore.RED, [Back.GREEN, Back.BLUE, Back.WHITE]),
+ (Fore.WHITE, [Back.BLACK, Back.MAGENTA, Back.RED, Back.BLUE]),
+ (Fore.YELLOW, [Back.RED, Back.CYAN, Back.BLUE, Back.WHITE]),
+ (Fore.LIGHTBLUE_EX, [Back.LIGHTRED_EX, Back.RED]),
+ (Fore.LIGHTCYAN_EX, [Back.LIGHTMAGENTA_EX, Back.MAGENTA]),
+ (Fore.LIGHTGREEN_EX, [Back.LIGHTYELLOW_EX, Back.YELLOW]),
+ (Fore.LIGHTMAGENTA_EX, [Back.LIGHTCYAN_EX, Back.CYAN]),
+ (Fore.LIGHTRED_EX, [Back.LIGHTGREEN_EX, Back.GREEN]),
+ (Fore.LIGHTWHITE_EX, [Back.LIGHTBLACK_EX, Back.BLACK]),
+ (Fore.LIGHTYELLOW_EX, [Back.LIGHTRED_EX, Back.RED]) ]
+
+ @staticmethod
+ def disableColor():
+ TermColor.HINT = ''
+ TermColor.WARNING = ''
+ TermColor.FAIL = ''
+ TermColor.BOLD = ''
+ TermColor.END = ''
+ TermColor.BLINK = ''
+ global USE_COLORAMA
+ USE_COLORAMA = False
+
+ @staticmethod
+ def calcColorHash(string):
+ h = 0
+ for char in string:
+ h += ord(char)
+ return h
+
+ @staticmethod
+ def getColorsByHash(string):
+ h = TermColor.calcColorHash(string)
+ tuple_index = h % len(TermColor.COLOR_TUPLES)
+ bg_tuple_index = h % len(TermColor.COLOR_TUPLES[tuple_index][1])
+ return (TermColor.COLOR_TUPLES[tuple_index][0],
+ TermColor.COLOR_TUPLES[tuple_index][1][bg_tuple_index])
+
+ @staticmethod
+ def setColorByString(string):
+ global USE_COLORAMA
+ if USE_COLORAMA is True:
+ fg_color, bg_color = TermColor.getColorsByHash(string)
+ color_hash = TermColor.calcColorHash(string)
+ return '{}{}{}{}{}'.format(Style.BRIGHT, fg_color, bg_color, string, Style.RESET_ALL)
+ else:
+ return '{}{}{}'.format(TermColor.BOLD, string, TermColor.END)
+
+class ThreadData:
+ pass
+
+class Instance:
+
+ def __init__(self, alias, source):
+ self.alias = str(alias)
+ self.source = str(source)
+ self.flows = dict()
+ self.thread_data = dict()
+
+ def __str__(self):
+ return '<%s.%s object at %s with alias %s, source %s>' % (
+ self.__class__.__module__,
+ self.__class__.__name__,
+ hex(id(self)),
+ self.alias,
+ self.source
+ )
+
+ def getThreadData(self, thread_id):
+ if thread_id not in self.thread_data:
+ return None
+ return self.thread_data[thread_id]
+
+ def getThreadDataFromJSON(self, json_dict):
+ if 'thread_id' not in json_dict:
+ return None
+ return self.getThreadData(json_dict['thread_id'])
+
+ def getMostRecentFlowTime(self, thread_id):
+ return self.thread_data[thread_id].most_recent_flow_time
+
+ def setMostRecentFlowTime(self, thread_id, most_recent_flow_time):
+ if thread_id in self.thread_data:
+ return self.thread_data[thread_id]
+
+ self.thread_data[thread_id] = ThreadData()
+ self.thread_data[thread_id].most_recent_flow_time = most_recent_flow_time
+ return self.thread_data[thread_id]
+
+ def getMostRecentFlowTimeFromJSON(self, json_dict):
+ if 'thread_id' not in json_dict:
+ return 0
+ return self.getThreadData(json_dict['thread_id']).most_recent_flow_time
+
+ def setMostRecentFlowTimeFromJSON(self, json_dict):
+ if 'thread_id' not in json_dict:
+ return
+ thread_id = json_dict['thread_id']
+ if 'thread_ts_usec' in json_dict:
+ mrtf = self.getMostRecentFlowTime(thread_id) if thread_id in self.thread_data else 0
+ self.setMostRecentFlowTime(thread_id, max(json_dict['thread_ts_usec'], mrtf))
+
+class Flow:
+
+ def __init__(self, flow_id, thread_id):
+ self.flow_id = flow_id
+ self.thread_id = thread_id
+ self.flow_last_seen = -1
+ self.flow_idle_time = -1
+ self.cleanup_reason = -1
+
+ def __str__(self):
+ return '<%s.%s object at %s with flow id %d>' % (
+ self.__class__.__module__,
+ self.__class__.__name__,
+ hex(id(self)),
+ self.flow_id
+ )
+
+class FlowManager:
+ CLEANUP_REASON_INVALID = 0
+ CLEANUP_REASON_DAEMON_INIT = 1 # can happen if kill -SIGKILL $(pidof nDPId) or restart after SIGSEGV
+ CLEANUP_REASON_DAEMON_SHUTDOWN = 2 # graceful shutdown e.g. kill -SIGTERM $(pidof nDPId)
+ CLEANUP_REASON_FLOW_END = 3
+ CLEANUP_REASON_FLOW_IDLE = 4
+ CLEANUP_REASON_FLOW_TIMEOUT = 5 # nDPId died a long time ago w/o restart?
+ CLEANUP_REASON_APP_SHUTDOWN = 6 # your python app called FlowManager.doShutdown()
+
+ def __init__(self):
+ self.instances = dict()
+
+ def getInstance(self, json_dict):
+ if 'alias' not in json_dict or \
+ 'source' not in json_dict:
+ return None
+
+ alias = json_dict['alias']
+ source = json_dict['source']
+
+ if alias not in self.instances:
+ self.instances[alias] = dict()
+ if source not in self.instances[alias]:
+ self.instances[alias][source] = Instance(alias, source)
+
+ self.instances[alias][source].setMostRecentFlowTimeFromJSON(json_dict)
+
+ return self.instances[alias][source]
+
+ @staticmethod
+ def getLastPacketTime(instance, flow_id, json_dict):
+ return max(int(json_dict['flow_src_last_pkt_time']), int(json_dict['flow_dst_last_pkt_time']), instance.flows[flow_id].flow_last_seen)
+
+ def getFlow(self, instance, json_dict):
+ if 'flow_id' not in json_dict:
+ return None
+
+ flow_id = int(json_dict['flow_id'])
+
+ if flow_id in instance.flows:
+ instance.flows[flow_id].flow_last_seen = FlowManager.getLastPacketTime(instance, flow_id, json_dict)
+ instance.flows[flow_id].flow_idle_time = int(json_dict['flow_idle_time'])
+ return instance.flows[flow_id]
+
+ thread_id = int(json_dict['thread_id'])
+ instance.flows[flow_id] = Flow(flow_id, thread_id)
+ instance.flows[flow_id].flow_last_seen = FlowManager.getLastPacketTime(instance, flow_id, json_dict)
+ instance.flows[flow_id].flow_idle_time = int(json_dict['flow_idle_time'])
+ instance.flows[flow_id].cleanup_reason = FlowManager.CLEANUP_REASON_INVALID
+
+ return instance.flows[flow_id]
+
+ def getFlowsToCleanup(self, instance, json_dict):
+ flows = dict()
+
+ if 'daemon_event_name' in json_dict:
+ if json_dict['daemon_event_name'].lower() == 'init' or \
+ json_dict['daemon_event_name'].lower() == 'shutdown':
+ # invalidate all existing flows with that alias/source/thread_id
+ for flow_id in instance.flows:
+ flow = instance.flows[flow_id]
+ if flow.thread_id != int(json_dict['thread_id']):
+ continue
+ if json_dict['daemon_event_name'].lower() == 'init':
+ flow.cleanup_reason = FlowManager.CLEANUP_REASON_DAEMON_INIT
+ else:
+ flow.cleanup_reason = FlowManager.CLEANUP_REASON_DAEMON_SHUTDOWN
+ flows[flow_id] = flow
+ for flow_id in flows:
+ del instance.flows[flow_id]
+ if len(instance.flows) == 0:
+ del self.instances[instance.alias][instance.source]
+
+ elif 'flow_event_name' in json_dict and \
+ (json_dict['flow_event_name'].lower() == 'end' or \
+ json_dict['flow_event_name'].lower() == 'idle' or \
+ json_dict['flow_event_name'].lower() == 'guessed' or \
+ json_dict['flow_event_name'].lower() == 'not-detected' or \
+ json_dict['flow_event_name'].lower() == 'detected'):
+ flow_id = json_dict['flow_id']
+ if json_dict['flow_event_name'].lower() == 'end':
+ instance.flows[flow_id].cleanup_reason = FlowManager.CLEANUP_REASON_FLOW_END
+ elif json_dict['flow_event_name'].lower() == 'idle':
+ instance.flows[flow_id].cleanup_reason = FlowManager.CLEANUP_REASON_FLOW_IDLE
+ # TODO: Flow Guessing/Detection can happen right before an idle event.
+ # We need to prevent that it results in a CLEANUP_REASON_FLOW_TIMEOUT.
+ # This may cause inconsistency and needs to be handled in another way.
+ if json_dict['flow_event_name'].lower() != 'guessed' and \
+ json_dict['flow_event_name'].lower() != 'not-detected' and \
+ json_dict['flow_event_name'].lower() != 'detected':
+ flows[flow_id] = instance.flows.pop(flow_id)
+
+ elif 'flow_last_seen' in json_dict:
+ if int(json_dict['flow_last_seen']) + int(json_dict['flow_idle_time']) < \
+ instance.getMostRecentFlowTimeFromJSON(json_dict):
+ flow_id = json_dict['flow_id']
+ instance.flows[flow_id].cleanup_reason = FlowManager.CLEANUP_REASON_FLOW_TIMEOUT
+ flows[flow_id] = instance.flows.pop(flow_id)
+
+ return flows
+
+ def doShutdown(self):
+ flows = dict()
+
+ for alias in self.instances:
+ for source in self.instances[alias]:
+ for flow_id in self.instances[alias][source].flows:
+ flow = self.instances[alias][source].flows[flow_id]
+ flow.cleanup_reason = FlowManager.CLEANUP_REASON_APP_SHUTDOWN
+ flows[flow_id] = flow
+
+ del self.instances
+
+ return flows
+
+ def verifyFlows(self):
+ invalid_flows = list()
+
+ for alias in self.instances:
+ for source in self.instances[alias]:
+ for flow_id in self.instances[alias][source].flows:
+ thread_id = self.instances[alias][source].flows[flow_id].thread_id
+ if self.instances[alias][source].flows[flow_id].flow_last_seen + \
+ self.instances[alias][source].flows[flow_id].flow_idle_time < \
+ self.instances[alias][source].getMostRecentFlowTime(thread_id):
+ invalid_flows += [flow_id]
+
+ return invalid_flows
+
+class nDPIsrvdException(Exception):
+ UNSUPPORTED_ADDRESS_TYPE = 1
+ BUFFER_CAPACITY_REACHED = 2
+ SOCKET_CONNECTION_BROKEN = 3
+ INVALID_LINE_RECEIVED = 4
+ CALLBACK_RETURNED_FALSE = 5
+ SOCKET_TIMEOUT = 6
+ JSON_DECODE_ERROR = 7
+
+ def __init__(self, etype):
+ self.etype = etype
+ def __str__(self):
+ return 'nDPIsrvdException type {}'.format(self.etype)
+
+class UnsupportedAddressType(nDPIsrvdException):
+ def __init__(self, addr):
+ super().__init__(nDPIsrvdException.UNSUPPORTED_ADDRESS_TYPE)
+ self.addr = addr
+ def __str__(self):
+ return '{}'.format(str(self.addr))
+
+class BufferCapacityReached(nDPIsrvdException):
+ def __init__(self, current_length, max_length):
+ super().__init__(nDPIsrvdException.BUFFER_CAPACITY_REACHED)
+ self.current_length = current_length
+ self.max_length = max_length
+ def __str__(self):
+ return '{} of {} bytes'.format(self.current_length, self.max_length)
+
+class SocketConnectionBroken(nDPIsrvdException):
+ def __init__(self):
+ super().__init__(nDPIsrvdException.SOCKET_CONNECTION_BROKEN)
+ def __str__(self):
+ return 'Disconnected.'
+
+class InvalidLineReceived(nDPIsrvdException):
+ def __init__(self, packet_buffer):
+ super().__init__(nDPIsrvdException.INVALID_LINE_RECEIVED)
+ self.packet_buffer = packet_buffer
+ def __str__(self):
+ return 'Received JSON line is invalid.'
+
+class CallbackReturnedFalse(nDPIsrvdException):
+ def __init__(self):
+ super().__init__(nDPIsrvdException.CALLBACK_RETURNED_FALSE)
+ def __str__(self):
+ return 'Callback returned False, abort.'
+
+class SocketTimeout(nDPIsrvdException):
+ def __init__(self):
+ super().__init__(nDPIsrvdException.SOCKET_TIMEOUT)
+ def __str__(self):
+ return 'Socket timeout.'
+
+class JsonDecodeError(nDPIsrvdException):
+ def __init__(self, json_exception, failed_line):
+ super().__init__(nDPIsrvdException.JSON_DECODE_ERROR)
+ self.json_exception = json_exception
+ self.failed_line = failed_line
+ def __str__(self):
+ return '{}: {}'.format(self.json_exception, self.failed_line)
+
+class JsonFilter():
+ def __init__(self, filter_string):
+ self.filter_string = filter_string
+ self.filter = compile(filter_string, '<string>', 'eval')
+ def evaluate(self, json_dict):
+ if type(json_dict) is not dict:
+ raise nDPIsrvdException('Could not evaluate JSON Filter: expected dictionary, got {}'.format(type(json_dict)))
+ return eval(self.filter, {'json_dict': json_dict})
+
+class nDPIsrvdSocket:
+ def __init__(self):
+ self.sock_family = None
+ self.flow_mgr = FlowManager()
+ self.received_bytes = 0
+ self.json_filter = list()
+
+ def addFilter(self, filter_str):
+ self.json_filter.append(JsonFilter(filter_str))
+
+ def evalFilters(self, json_dict):
+ for jf in self.json_filter:
+ try:
+ json_filter_retval = jf.evaluate(json_dict)
+ except Exception as err:
+ print()
+ sys.stderr.write('Error while evaluating expression "{}"\n'.format(jf.filter_string))
+ raise err
+
+ if not isinstance(json_filter_retval, bool):
+ print()
+ sys.stderr.write('Error while evaluating expression "{}"\n'.format(jf.filter_string))
+ raise nDPIsrvdException('JSON Filter returned an invalid type: expected bool, got {}'.format(type(json_filter_retval)))
+
+ if json_filter_retval is False:
+ return False
+
+ return True
+
+ def connect(self, addr):
+ if type(addr) is tuple:
+ self.sock_family = socket.AF_INET
+ elif type(addr) is str:
+ self.sock_family = socket.AF_UNIX
+ else:
+ raise UnsupportedAddressType(addr)
+
+ self.sock = socket.socket(self.sock_family, socket.SOCK_STREAM)
+ self.sock.connect(addr)
+ self.buffer = bytes()
+ self.msglen = 0
+ self.digitlen = 0
+ self.lines = []
+ self.failed_lines = []
+ self.filtered_lines = 0
+
+ def timeout(self, timeout):
+ self.sock.settimeout(timeout)
+
+ def receive(self):
+ if len(self.buffer) == NETWORK_BUFFER_MAX_SIZE:
+ raise BufferCapacityReached(len(self.buffer), NETWORK_BUFFER_MAX_SIZE)
+
+ connection_finished = False
+ try:
+ recvd = self.sock.recv(NETWORK_BUFFER_MAX_SIZE - len(self.buffer))
+ except ConnectionResetError:
+ connection_finished = True
+ recvd = bytes()
+ except TimeoutError:
+ raise SocketTimeout()
+ except socket.timeout:
+ raise SocketTimeout()
+
+ if len(recvd) == 0:
+ connection_finished = True
+
+ self.buffer += recvd
+
+ new_data_avail = False
+ while self.msglen + self.digitlen <= len(self.buffer):
+
+ if self.msglen == 0:
+ starts_with_digits = re.match(r'(^\d+){', self.buffer[:NETWORK_BUFFER_MIN_SIZE].decode(errors='strict'))
+ if starts_with_digits is None:
+ if len(self.buffer) < NETWORK_BUFFER_MIN_SIZE:
+ break
+ raise InvalidLineReceived(self.buffer)
+ self.msglen = int(starts_with_digits.group(1))
+ self.digitlen = len(starts_with_digits.group(1))
+
+ if len(self.buffer) >= self.msglen + self.digitlen:
+ recvd = self.buffer[self.digitlen:self.msglen + self.digitlen]
+ self.buffer = self.buffer[self.msglen + self.digitlen:]
+ self.lines += [(recvd,self.msglen,self.digitlen)]
+ new_data_avail = True
+
+ self.received_bytes += self.msglen + self.digitlen
+ self.msglen = 0
+ self.digitlen = 0
+
+ if connection_finished is True:
+ raise SocketConnectionBroken()
+
+ return new_data_avail
+
+ def parse(self, callback_json, callback_flow_cleanup, global_user_data):
+ retval = True
+
+ for received_line in self.lines:
+ try:
+ json_dict = json.loads(received_line[0].decode('ascii', errors='replace'), strict=True)
+ except json.decoder.JSONDecodeError as e:
+ json_dict = dict()
+ self.failed_lines += [received_line]
+ self.lines = self.lines[1:]
+ raise JsonDecodeError(e, received_line)
+
+ instance = self.flow_mgr.getInstance(json_dict)
+ if instance is None:
+ self.failed_lines += [received_line]
+ retval = False
+ continue
+
+ current_flow = self.flow_mgr.getFlow(instance, json_dict)
+ filter_eval = self.evalFilters(json_dict)
+ if filter_eval is True:
+ try:
+ if callback_json(json_dict, instance, current_flow, global_user_data) is not True:
+ self.failed_lines += [received_line]
+ retval = False
+ except Exception as e:
+ self.failed_lines += [received_line]
+ self.lines = self.lines[1:]
+ raise(e)
+ else:
+ self.filtered_lines += 1
+
+ for _, flow in self.flow_mgr.getFlowsToCleanup(instance, json_dict).items():
+ if callback_flow_cleanup is None:
+ pass
+ elif filter_eval is True and callback_flow_cleanup(instance, flow, global_user_data) is not True:
+ self.failed_lines += [received_line]
+ self.lines = self.lines[1:]
+ retval = False
+
+ self.lines = self.lines[1:]
+
+ return retval
+
+ def loop(self, callback_json, callback_flow_cleanup, global_user_data):
+ throw_ex = None
+
+ while True:
+ bytes_recv = 0
+ try:
+ bytes_recv = self.receive()
+ except Exception as err:
+ throw_ex = err
+
+ if self.parse(callback_json, callback_flow_cleanup, global_user_data) is False:
+ raise CallbackReturnedFalse()
+
+ if throw_ex is not None:
+ raise throw_ex
+
+ def shutdown(self):
+ return self.flow_mgr.doShutdown().items()
+
+ def verify(self):
+ if len(self.failed_lines) > 0:
+ raise nDPIsrvdException('Failed lines > 0: {}'.format(len(self.failed_lines)))
+ return self.flow_mgr.verifyFlows()
+
+def defaultArgumentParser(desc='nDPIsrvd Python Interface', enable_json_filter=False,
+ help_formatter=argparse.ArgumentDefaultsHelpFormatter):
+ parser = argparse.ArgumentParser(description=desc, formatter_class=help_formatter)
+ parser.add_argument('--host', type=str, help='nDPIsrvd host IP')
+ parser.add_argument('--port', type=int, default=DEFAULT_PORT, help='nDPIsrvd TCP port')
+ parser.add_argument('--unix', type=str, help='nDPIsrvd unix socket path')
+ if enable_json_filter is True:
+ parser.add_argument('--filter', type=str, action='append',
+ help='Set a filter string which if evaluates to True will invoke the JSON callback.\n'
+ 'Example: json_dict[\'flow_event_name\'] == \'detected\' will only process \'detected\' events.')
+ return parser
+
+def toSeconds(usec):
+ return usec / (1000 * 1000)
+
+def validateAddress(args):
+ tcp_addr_set = False
+ address = None
+
+ if args.host is None:
+ address_tcpip = (DEFAULT_HOST, args.port)
+ else:
+ address_tcpip = (args.host, args.port)
+ tcp_addr_set = True
+
+ if args.unix is None:
+ address_unix = DEFAULT_UNIX
+ else:
+ address_unix = args.unix
+
+ possible_sock_mode = 0
+ try:
+ possible_sock_mode = os.stat(address_unix).st_mode
+ except:
+ pass
+ if tcp_addr_set == False and stat.S_ISSOCK(possible_sock_mode):
+ address = address_unix
+ else:
+ address = address_tcpip
+
+ return address
+
+def prepareJsonFilter(args, nsock):
+ # HowTo use JSON Filters:
+ # Add `--filter [FILTER_STRING]` to the Python scripts that support JSON filtering.
+ # Examples:
+ # ./examples/py-json-stdout/json-stdout.py --filter '"ndpi" in json_dict and "proto" in json_dict["ndpi"]'
+ # The command above will print only JSONs that have the subobjects json_dict["ndpi"] and json_dict["ndpi"]["proto"] available.
+ # ./examples/py-flow-info/flow-info.py --filter 'json_dict["source"] == "eth0"' --filter '"flow_event_name" in json_dict and json_dict["flow_event_name"] == "analyse"'
+ # Multiple JSON filter will be ANDed together.
+ # Note: You may *only* use the global "json_dict" in your expressions.
+ try:
+ json_filter = args.filter
+ if json_filter is not None:
+ for jf in json_filter:
+ nsock.addFilter(jf)
+ except AttributeError:
+ pass
+
+global schema
+schema = {'packet_event_schema' : None, 'error_event_schema' : None, 'daemon_event_schema' : None, 'flow_event_schema' : None}
+
+def initSchemaValidator(schema_dirs=[]):
+ if len(schema_dirs) == 0:
+ schema_dirs += [os.path.dirname(sys.argv[0]) + '/../../schema']
+ schema_dirs += [os.path.dirname(sys.argv[0]) + '/../share/nDPId']
+ schema_dirs += [sys.base_prefix + '/share/nDPId']
+
+ for key in schema:
+ for schema_dir in schema_dirs:
+ try:
+ with open(schema_dir + '/' + str(key) + '.json', 'r') as schema_file:
+ schema[key] = json.load(schema_file)
+ except FileNotFoundError:
+ continue
+ else:
+ break
+
+def validateAgainstSchema(json_dict):
+ import jsonschema
+
+ if 'packet_event_id' in json_dict:
+ try:
+ jsonschema.Draft7Validator(schema=schema['packet_event_schema']).validate(instance=json_dict)
+ except AttributeError:
+ jsonschema.validate(instance=json_dict, schema=schema['packet_event_schema'])
+ return True
+ if 'error_event_id' in json_dict:
+ try:
+ jsonschema.Draft7Validator(schema=schema['error_event_schema']).validate(instance=json_dict)
+ except AttributeError:
+ jsonschema.validate(instance=json_dict, schema=schema['error_event_schema'])
+ return True
+ if 'daemon_event_id' in json_dict:
+ try:
+ jsonschema.Draft7Validator(schema=schema['daemon_event_schema']).validate(instance=json_dict)
+ except AttributeError:
+ jsonschema.validate(instance=json_dict, schema=schema['daemon_event_schema'])
+ return True
+ if 'flow_event_id' in json_dict:
+ try:
+ jsonschema.Draft7Validator(schema=schema['flow_event_schema']).validate(instance=json_dict)
+ except AttributeError:
+ jsonschema.validate(instance=json_dict, schema=schema['flow_event_schema'])
+ return True
+
+ return False