diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2023-08-25 20:08:01 +0200 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2023-08-27 20:08:01 +0200 |
commit | a7bd3570b03f6b2fdc9bab09c956193708723cbf (patch) | |
tree | ad6f22edea20d3d718459ca6b3da8ee121e59b73 | |
parent | b01498f011eac9b91c076901ffb5c9c04e7691c0 (diff) |
Enable custom JSON filter expressions for Python scripts.
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
-rw-r--r-- | dependencies/nDPIsrvd.py | 78 | ||||
-rwxr-xr-x | examples/py-flow-info/flow-info.py | 28 | ||||
-rwxr-xr-x | examples/py-json-stdout/json-stdout.py | 3 |
3 files changed, 92 insertions, 17 deletions
diff --git a/dependencies/nDPIsrvd.py b/dependencies/nDPIsrvd.py index 166af5596..bd15a6bde 100644 --- a/dependencies/nDPIsrvd.py +++ b/dependencies/nDPIsrvd.py @@ -342,11 +342,43 @@ class SocketTimeout(nDPIsrvdException): def __str__(self): return 'Socket timeout.' +class JsonFilter(): + def __init__(self, filter_string): + self.filter_string = filter_string + self.filter = compile(filter_string, '<string>', 'eval') + def evaluate(self, json_dict): + if type(json_dict) is not dict: + raise nDPIsrvdException('Could not evaluate JSON Filter: expected dictionary, got {}'.format(type(json_dict))) + return eval(self.filter, {'json_dict': json_dict}) + class nDPIsrvdSocket: def __init__(self): self.sock_family = None self.flow_mgr = FlowManager() self.received_bytes = 0 + self.json_filter = list() + + def addFilter(self, filter_str): + self.json_filter.append(JsonFilter(filter_str)) + + def evalFilters(self, json_dict): + for jf in self.json_filter: + try: + json_filter_retval = jf.evaluate(json_dict) + except Exception as err: + print() + sys.stderr.write('Error while evaluating expression "{}"\n'.format(jf.filter_string)) + raise err + + if type(json_filter_retval) != bool: + print() + sys.stderr.write('Error while evaluating expression "{}"\n'.format(jf.filter_string)) + raise nDPIsrvdException('JSON Filter returned an invalid type: expected bool, got {}'.format(type(json_filter_retval))) + + if json_filter_retval is False: + return False + + return True def connect(self, addr): if type(addr) is tuple: @@ -363,6 +395,7 @@ class nDPIsrvdSocket: self.digitlen = 0 self.lines = [] self.failed_lines = [] + self.filtered_lines = 0 def timeout(self, timeout): self.sock.settimeout(timeout) @@ -432,19 +465,24 @@ class nDPIsrvdSocket: retval = False continue - try: - if callback_json(json_dict, instance, self.flow_mgr.getFlow(instance, json_dict), global_user_data) is not True: - self.failed_lines += [received_line] - retval = False - except Exception as e: - self.failed_lines += [received_line] - self.lines = self.lines[1:] - raise(e) + current_flow = self.flow_mgr.getFlow(instance, json_dict) + filter_eval = self.evalFilters(json_dict) + if filter_eval is True: + try: + if callback_json(json_dict, instance, current_flow, global_user_data) is not True: + self.failed_lines += [received_line] + retval = False + except Exception as e: + self.failed_lines += [received_line] + self.lines = self.lines[1:] + raise(e) + else: + self.filtered_lines += 1 for _, flow in self.flow_mgr.getFlowsToCleanup(instance, json_dict).items(): if callback_flow_cleanup is None: pass - elif callback_flow_cleanup(instance, flow, global_user_data) is not True: + elif filter_eval is True and callback_flow_cleanup(instance, flow, global_user_data) is not True: self.failed_lines += [received_line] self.lines = self.lines[1:] retval = False @@ -477,12 +515,15 @@ class nDPIsrvdSocket: raise nDPIsrvdException('Failed lines > 0: {}'.format(len(self.failed_lines))) return self.flow_mgr.verifyFlows() -def defaultArgumentParser(desc='nDPIsrvd Python Interface', +def defaultArgumentParser(desc='nDPIsrvd Python Interface', enable_json_filter=False, help_formatter=argparse.ArgumentDefaultsHelpFormatter): parser = argparse.ArgumentParser(description=desc, formatter_class=help_formatter) parser.add_argument('--host', type=str, help='nDPIsrvd host IP') parser.add_argument('--port', type=int, default=DEFAULT_PORT, help='nDPIsrvd TCP port') parser.add_argument('--unix', type=str, help='nDPIsrvd unix socket path') + if enable_json_filter is True: + parser.add_argument('--filter', type=str, action='append', + help='Set a filter string which if evaluates to True will invoke the JSON callback.') return parser def toSeconds(usec): @@ -515,6 +556,23 @@ def validateAddress(args): return address +def prepareJsonFilter(args, nsock): + # HowTo use JSON Filters: + # Add `--filter [FILTER_STRING]` to the Python scripts that support JSON filtering. + # Examples: + # ./examples/py-json-stdout/json-stdout.py --filter '"ndpi" in json_dict and "proto" in json_dict["ndpi"]' + # The command above will print only JSONs that have the subobjects json_dict["ndpi"] and json_dict["ndpi"]["proto"] available. + # ./examples/py-flow-info/flow-info.py --filter 'json_dict["source"] == "eth0"' --filter '"flow_event_name" in json_dict and json_dict["flow_event_name"] == "analyse"' + # Multiple JSON filter will be ANDed together. + # Note: You may *only* use the global "json_dict" in your expressions. + try: + json_filter = args.filter + if json_filter is not None: + for jf in json_filter: + nsock.addFilter(jf) + except AttributeError: + pass + global schema schema = {'packet_event_schema' : None, 'error_event_schema' : None, 'daemon_event_schema' : None, 'flow_event_schema' : None} diff --git a/examples/py-flow-info/flow-info.py b/examples/py-flow-info/flow-info.py index 5bd865a9d..7ef74c6a6 100755 --- a/examples/py-flow-info/flow-info.py +++ b/examples/py-flow-info/flow-info.py @@ -120,11 +120,26 @@ class Stats: flow_count += 1 current_flow = instances[alias][source].flows[flow_id] - flow_tot_l4_payload_len += current_flow.flow_src_tot_l4_payload_len + current_flow.flow_dst_tot_l4_payload_len - risky += 1 if len(current_flow.flow_risk) > 0 else 0 - midstream += 1 if current_flow.midstream != 0 else 0 - guessed += 1 if current_flow.guessed != 0 else 0 - not_detected = 1 if current_flow.not_detected != 0 else 0 + try: + flow_src_tot_l4_payload_len = current_flow.flow_src_tot_l4_payload_len + flow_dst_tot_l4_payload_len = current_flow.flow_dst_tot_l4_payload_len + flow_risk = current_flow.flow_risk + midstream = current_flow.midstream + guessed = current_flow.guessed + not_detected = current_flow.not_detected + except AttributeError: + flow_src_tot_l4_payload_len = 0 + flow_dst_tot_l4_payload_len = 0 + flow_risk = [] + midstream = 0 + guessed = 0 + not_detected = 0 + + flow_tot_l4_payload_len += flow_src_tot_l4_payload_len + flow_dst_tot_l4_payload_len + risky += 1 if len(flow_risk) > 0 else 0 + midstream += 1 if midstream != 0 else 0 + guessed += 1 if guessed != 0 else 0 + not_detected = 1 if not_detected != 0 else 0 return alias_count, source_count, flow_count, \ flow_tot_l4_payload_len, \ @@ -519,7 +534,7 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): return True if __name__ == '__main__': - argparser = nDPIsrvd.defaultArgumentParser('Prettify and print events using the nDPIsrvd Python interface.') + argparser = nDPIsrvd.defaultArgumentParser('Prettify and print events using the nDPIsrvd Python interface.', True) argparser.add_argument('--no-color', action='store_true', default=False, help='Disable all terminal colors.') argparser.add_argument('--no-statusbar', action='store_true', default=False, @@ -577,6 +592,7 @@ if __name__ == '__main__': sys.stderr.write('Connecting to {} ..\n'.format(address[0]+':'+str(address[1]) if type(address) is tuple else address)) nsock = nDPIsrvdSocket() + nDPIsrvd.prepareJsonFilter(args, nsock) nsock.connect(address) nsock.timeout(1.0) stats = Stats(nsock) diff --git a/examples/py-json-stdout/json-stdout.py b/examples/py-json-stdout/json-stdout.py index f1aa51b5b..cde22cd9b 100755 --- a/examples/py-json-stdout/json-stdout.py +++ b/examples/py-json-stdout/json-stdout.py @@ -15,7 +15,7 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): return True if __name__ == '__main__': - argparser = nDPIsrvd.defaultArgumentParser() + argparser = nDPIsrvd.defaultArgumentParser('Plain and simple nDPIsrvd JSON event printer with filter capabilities.', True) args = argparser.parse_args() address = nDPIsrvd.validateAddress(args) @@ -23,5 +23,6 @@ if __name__ == '__main__': sys.stderr.write('Connecting to {} ..\n'.format(address[0]+':'+str(address[1]) if type(address) is tuple else address)) nsock = nDPIsrvdSocket() + nDPIsrvd.prepareJsonFilter(args, nsock) nsock.connect(address) nsock.loop(onJsonLineRecvd, None, None) |