diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2021-02-16 20:37:29 +0100 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2021-02-16 20:43:00 +0100 |
commit | 893f43705132dfeb64dd33dc8697193f463708c0 (patch) | |
tree | c3748bb28b112ef8ad2519104373b55be2bfd5a9 | |
parent | 7218990e5826e4179fcb6360ef8e087fcc45467a (diff) |
Aligned nDPIsrvd.py to nDPIsrvd.h for consistency.
* Simplified Python interface as well.
* c-captured and flow-undetected-to-pcap.py produce similiar results
* Removed overloaded nDPIsrvd.py event structures.
* flow-info.py prints (with a color-hash) additional information e.g. alias/source and midstream
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
-rw-r--r-- | dependencies/nDPIsrvd.h | 4 | ||||
-rw-r--r-- | dependencies/nDPIsrvd.py | 237 | ||||
-rw-r--r-- | examples/c-captured/c-captured.c | 94 | ||||
-rwxr-xr-x | examples/py-flow-info/flow-info.py | 96 | ||||
-rwxr-xr-x | examples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py | 76 | ||||
-rwxr-xr-x | examples/py-json-stdout/json-stdout.py | 17 | ||||
-rwxr-xr-x | examples/py-risky-flow-to-pcap/risky-flow-to-pcap.py | 67 |
7 files changed, 294 insertions, 297 deletions
diff --git a/dependencies/nDPIsrvd.h b/dependencies/nDPIsrvd.h index 75dcda021..ba75d3e9e 100644 --- a/dependencies/nDPIsrvd.h +++ b/dependencies/nDPIsrvd.h @@ -83,7 +83,7 @@ typedef nDPIsrvd_ull * nDPIsrvd_ull_ptr; struct nDPIsrvd_flow { - char id[nDPIsrvd_FLOW_ID_STRLEN]; + char id[nDPIsrvd_FLOW_ID_STRLEN]; // TODO: use alias and source for flow key as well nDPIsrvd_ull id_as_ull; UT_hash_handle hh; uint8_t flow_user_data[0]; @@ -528,7 +528,7 @@ static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket * TOKEN_VALUE_TO_ULL(flow_id, &flow->id_as_ull); snprintf(flow->id, nDPIsrvd_FLOW_ID_STRLEN, "%.*s", flow_id->value_length, flow_id->value); - HASH_ADD(hh, sock->flow_table, id,flow_id->value_length, flow); + HASH_ADD(hh, sock->flow_table, id, flow_id->value_length, flow); } return flow; diff --git a/dependencies/nDPIsrvd.py b/dependencies/nDPIsrvd.py index 4510658a4..53ed131e8 100644 --- a/dependencies/nDPIsrvd.py +++ b/dependencies/nDPIsrvd.py @@ -2,6 +2,7 @@ import argparse import array +import base64 import json import re import os @@ -25,19 +26,6 @@ NETWORK_BUFFER_MAX_SIZE = 12288 # Please keep this value in sync with the one in PKT_TYPE_ETH_IP4 = 0x0800 PKT_TYPE_ETH_IP6 = 0x86DD -EVENT_UNKNOWN = 'Unknown' -# Event tuple: (pretty-name, real-name) -DAEMON_EVENTS = [ ('Invalid','invalid'), ('Init','init'), \ - ('Reconnect','reconnect'), ('Shutdown','shutdown') ] -BASIC_EVENTS = ['Invalid', 'Unknown-Datalink-Layer', 'Unknown-Layer3-Protocol', 'Non-IP-Packet', - 'Ethernet-Packet-Too-Short', 'Ethernet-Packet-Unknown', 'IP4-Packet-Too-Short', - 'IP4-Size-Smaller-Than-Header', 'IP4-Layer4-Payload-Detection-Failed', 'IP6-Packet-Too-Short', - 'IP6-Size-Smaller-Than-Header', 'IP6-Layer4-Payload-Detection-Failed', 'TCP-Packet-Too-Short', - 'UDP-Packet-Too-Short', 'Capture-Size-Smaller-Than-Packet-Size', 'Max-Flow-To-Track', - 'Flow-Memory-Allocation-Failed'] -PACKET_EVENTS = [ ('Invalid','invalid'), ('Packet','packet'), ('Packet-Flow','packet-flow') ] -FLOW_EVENTS = [ ('Invalid','invalid'), ('New','new'), ('End','end'), ('Idle','idle'), ('Guessed','guessed'), \ - ('Detected','detected'), ('Detection-Update','detection-update'), ('Not-Detected','not-detected') ] class TermColor: HINT = '\033[33m' @@ -87,9 +75,42 @@ class TermColor: else: return '{}{}{}'.format(TermColor.BOLD, string, TermColor.END) +class Flow: + flow_id = -1 + +class FlowManager: + def __init__(self): + self.__flows = dict() + + def __buildFlowKey(self, json_dict): + if 'flow_id' not in json_dict or \ + 'alias' not in json_dict or \ + 'source' not in json_dict: + return None + + return str(json_dict['alias']) + str(json_dict['source']) + str(json_dict['flow_id']) + + def getFlow(self, json_dict): + event = json_dict['flow_event_name'].lower() if 'flow_event_name' in json_dict else '' + flow_key = self.__buildFlowKey(json_dict) + flow = None + + if flow_key is None: + return None + if flow_key not in self.__flows: + self.__flows[flow_key] = Flow() + self.__flows[flow_key].flow_id = int(json_dict['flow_id']) + flow = self.__flows[flow_key] + if event == 'end' or event == 'idle': + flow = self.__flows[flow_key] + del self.__flows[flow_key] + + return flow + class nDPIsrvdSocket: def __init__(self): self.sock_family = None + self.flow_mgr = FlowManager() def connect(self, addr): if type(addr) is tuple: @@ -104,18 +125,19 @@ class nDPIsrvdSocket: self.buffer = bytes() self.msglen = 0 self.digitlen = 0 + self.lines = [] def receive(self): if len(self.buffer) == NETWORK_BUFFER_MAX_SIZE: - raise RuntimeError('buffer capacity reached ({} bytes), check if it is in sync with nDPId\'s NETWORK_BUFFER_MAX_SIZE'.format(NETWORK_BUFFER_MAX_SIZE)) + raise RuntimeError('Buffer capacity reached ({} bytes), check if it is in sync with nDPId\'s NETWORK_BUFFER_MAX_SIZE.'.format(NETWORK_BUFFER_MAX_SIZE)) recvd = self.sock.recv(NETWORK_BUFFER_MAX_SIZE - len(self.buffer)) if len(recvd) == 0: - raise RuntimeError('socket connection broken') + raise RuntimeError('Socket connection broken.') self.buffer += recvd - retval = [] + new_data_avail = False while self.msglen + self.digitlen < len(self.buffer): if self.msglen == 0: @@ -130,23 +152,80 @@ class nDPIsrvdSocket: if len(self.buffer) >= self.msglen + self.digitlen: recvd = self.buffer[self.digitlen:self.msglen + self.digitlen] self.buffer = self.buffer[self.msglen + self.digitlen:] - retval += [(recvd,self.msglen,self.digitlen)] + self.lines += [(recvd,self.msglen,self.digitlen)] + new_data_avail = True self.msglen = 0 self.digitlen = 0 + return new_data_avail + + def parse(self, callback, global_user_data): + retval = True + index = 0 + for received_json_line in self.lines: + json_dict = json.loads(received_json_line[0].decode('ascii', errors='replace'), strict=True) + if callback(json_dict, self.flow_mgr.getFlow(json_dict), global_user_data) is not True: + retval = False + break + index += 1 + + self.lines = self.lines[index:] + return retval + def loop(self, callback, global_user_data): + while True: + if self.receive() > 0: + if self.parse(callback, global_user_data) is False: + sys.stderr.write('Callback returned False, abort.\n') + break; + class PcapPacket: - def __init__(self, flow_id=-1): + def __init__(self): self.pktdump = None - self.was_dumped = False - self.was_detected = False - self.flow_id = flow_id + self.flow_id = 0 self.packets = [] + self.__suffix = '' + self.__dump = False + self.__dumped = False - def addPacket(self, pkt, pkt_type, pkt_ipoffset): - self.packets += [ ( pkt, pkt_type, pkt_ipoffset ) ] + @staticmethod + def isInitialized(current_flow): + return current_flow is not None and hasattr(current_flow, 'pcap_packet') + + @staticmethod + def handleJSON(json_dict, current_flow): + if 'flow_event_name' in json_dict: + + if json_dict['flow_event_name'] == 'new': + + current_flow.pcap_packet = PcapPacket() + current_flow.pcap_packet.current_packet = 0 + current_flow.pcap_packet.max_packets = json_dict['flow_max_packets'] + current_flow.pcap_packet.flow_id = json_dict['flow_id'] + + elif PcapPacket.isInitialized(current_flow) is not True: + + pass + + elif json_dict['flow_event_name'] == 'end' or json_dict['flow_event_name'] == 'idle': + + try: + current_flow.pcap_packet.fin() + except RuntimeError: + pass + + elif PcapPacket.isInitialized(current_flow) is True and \ + ('packet_event_name' in json_dict and json_dict['packet_event_name'] == 'packet-flow' and current_flow.pcap_packet.flow_id > 0) or \ + ('packet_event_name' in json_dict and json_dict['packet_event_name'] == 'packet' and 'pkt' in json_dict): + + buffer_decoded = base64.b64decode(json_dict['pkt'], validate=True) + current_flow.pcap_packet.packets += [ ( buffer_decoded, json_dict['pkt_type'], json_dict['pkt_l3_offset'] ) ] + current_flow.pcap_packet.current_packet += 1 + + if current_flow.pcap_packet.current_packet != int(json_dict['flow_packet_id']): + raise RuntimeError('Packet IDs not in sync (local: {}, remote: {}).'.format(current_flow.pcap_packet.current_packet, int(json_dict['flow_packet_id']))) @staticmethod def getIp(packet): @@ -167,14 +246,17 @@ class PcapPacket: else: return None - def detected(self): - self.was_detected = True + def setSuffix(self, filename_suffix): + self.__suffix = filename_suffix + + def doDump(self): + self.__dump = True - def fin(self, filename_suffix): - if self.was_dumped is True: - return 'Flow already dumped.' - if self.was_detected is True: - return 'Flow detected.' + def fin(self): + if self.__dumped is True: + raise RuntimeError('Flow {} already dumped.'.format(self.flow_id)) + if self.__dump is False: + raise RuntimeError('Flow {} should not be dumped.'.format(self.flow_id)) emptyTCPorUDPcount = 0; for packet in self.packets: @@ -182,108 +264,27 @@ class PcapPacket: if p is not None: if p.haslayer(scapy.all.Padding) and len(p.payload) - len(p[scapy.all.Padding]) == 0: emptyTCPorUDPcount += 1 - if len(p.payload) == 0: + elif len(p.payload) == 0: emptyTCPorUDPcount += 1 if emptyTCPorUDPcount == len(self.packets): - return 'Flow does not contain any packets with non-empty layer4 payload.' + raise RuntimeError('Flow {} does not contain any packets({}) with non-empty layer4 payload.'.format(self.flow_id, len(self.packets))) if self.pktdump is None: - if self.flow_id == -1: - self.pktdump = scapy.all.PcapWriter('packet-{}.pcap'.format(filename_suffix), + if self.flow_id == 0: + self.pktdump = scapy.all.PcapWriter('packet-{}.pcap'.format(self.__suffix), append=True, sync=True) else: - self.pktdump = scapy.all.PcapWriter('flow-{}-{}.pcap'.format(filename_suffix, self.flow_id), + self.pktdump = scapy.all.PcapWriter('flow-{}-{}.pcap'.format(self.__suffix, self.flow_id), append=False, sync=True) for packet in self.packets: self.pktdump.write(PcapPacket.getIp(packet)) self.pktdump.close() - self.was_dumped = True - - return 'Success.' - -def JsonParseBytes(json_bytes): - return json.loads(json_bytes.decode('ascii', errors='replace'), strict=True) - -class nDPIdEvent: - isValid = False - DaemonEventID = -1 - DaemonEventName = None - DaemonEventPrettyName = EVENT_UNKNOWN - BasicEventID = -1 - BasicEventName = None - BasicEventPrettyName = EVENT_UNKNOWN - PacketEventID = -1 - PacketEventName = None - PacketEventPrettyName = EVENT_UNKNOWN - FlowEventID = -1 - FlowEventName = None - FlowEventPrettyName = EVENT_UNKNOWN - - def validateEvent(self, event_id, event_name, list_of_event_tuples): - if self.isValid is True: - raise RuntimeError('nDPId event already validated. Multiple Events in one JSON strings are not allowed.\n' \ - '[EVENTS]\n' - 'current: {}\n' \ - 'daemon.: {}\n' \ - 'basic..: {}\n' \ - 'packet.: {}\n' \ - 'flow...: {}\n'.format(event_name, - self.DaemonEventName, self.BasicEventName, \ - self.PacketEventName, self.FlowEventName)) - - if type(event_id) is not int: - raise RuntimeError('Argument is not an Integer/EventID!') - - if event_id < 0 or event_id >= len(list_of_event_tuples): - raise RuntimeError('Unknown event id: {} aka {}.'.format(event_id, event_name)) - - if type(list_of_event_tuples[0]) == tuple and list_of_event_tuples[event_id][1] != event_name: - raise RuntimeError('Unknown event name: {}.'.format(event_name)) - - self.isValid = True - return list_of_event_tuples[event_id][0] if type(list_of_event_tuples[0]) == tuple \ - else list_of_event_tuples[event_id] - - def validateFlowEvent(self): - return self.validateEvent(self.FlowEventID, self.FlowEventName, FLOW_EVENTS) - - def validatePacketEvent(self): - return self.validateEvent(self.PacketEventID, self.PacketEventName, PACKET_EVENTS) - - def validateBasicEvent(self): - return self.validateEvent(self.BasicEventID, self.BasicEventName, BASIC_EVENTS) - - def validateDaemonEvent(self): - return self.validateEvent(self.DaemonEventID, self.DaemonEventName, DAEMON_EVENTS) + self.__dumped = True - @staticmethod - def validateJsonEventTypes(json_dict): - if type(json_dict) is not dict: - raise RuntimeError('Argument is not a dictionary!') - - nev = nDPIdEvent() - - if 'daemon_event_id' in json_dict: - nev.DaemonEventID = json_dict['daemon_event_id'] - nev.DaemonEventName = json_dict['daemon_event_name'] - nev.DaemonEventPrettyName = nev.validateDaemonEvent() - if 'basic_event_id' in json_dict: - nev.BasicEventID = json_dict['basic_event_id'] - nev.BasicEventName = json_dict['basic_event_name'] - nev.BasicEventPrettyName = nev.validateBasicEvent() - if 'packet_event_id' in json_dict: - nev.PacketEventID = json_dict['packet_event_id'] - nev.PacketEventName = json_dict['packet_event_name'] - nev.PacketEventPrettyName = nev.validatePacketEvent() - if 'flow_event_id' in json_dict: - nev.FlowEventID = json_dict['flow_event_id'] - nev.FlowEventName = json_dict['flow_event_name'] - nev.FlowEventPrettyName = nev.validateFlowEvent() - - return nev + return True def defaultArgumentParser(): parser = argparse.ArgumentParser(description='nDPIsrvd options', formatter_class=argparse.ArgumentDefaultsHelpFormatter) diff --git a/examples/c-captured/c-captured.c b/examples/c-captured/c-captured.c index d0e60521d..a192acb31 100644 --- a/examples/c-captured/c-captured.c +++ b/examples/c-captured/c-captured.c @@ -10,6 +10,7 @@ #include <string.h> #include <sys/socket.h> #include <sys/types.h> +#include <time.h> #include <unistd.h> #include "nDPIsrvd.h" @@ -37,8 +38,6 @@ struct flow_user_data uint8_t detected; nDPIsrvd_ull flow_datalink; nDPIsrvd_ull flow_max_packets; - nDPIsrvd_ull flow_l4_header_len; - nDPIsrvd_ull flow_total_l4_payload_len; UT_array * packets; }; @@ -47,6 +46,11 @@ static int main_thread_shutdown = 0; static char const serv_listen_path[] = DISTRIBUTOR_UNIX_SOCKET; static char const serv_listen_addr[INET_ADDRSTRLEN] = DISTRIBUTOR_HOST; static uint16_t const serv_listen_port = DISTRIBUTOR_PORT; +#ifdef pcap_dump_open_append +static time_t pcap_filename_rotation = 600; +static time_t pcap_filename_last_rotation = 0; +static struct tm pcap_filename_last_rotation_tm = {}; +#endif static void packet_data_copy(void * dst, const void * src) { @@ -82,10 +86,41 @@ static char * generate_pcap_filename(struct nDPIsrvd_flow const * const flow, char * const dest, size_t size) { + char appendix[32] = {}; + +#ifdef pcap_dump_open_append + if (pcap_filename_rotation > 0) + { + time_t current_time = time(NULL); + + if (current_time >= pcap_filename_last_rotation + pcap_filename_rotation) + { + pcap_filename_last_rotation = current_time; + if (localtime_r(&pcap_filename_last_rotation, &pcap_filename_last_rotation_tm) == NULL) + { + return NULL; + } + } + + if (strftime(appendix, sizeof(appendix), "%d_%m_%y-%H_%M_%S", &pcap_filename_last_rotation_tm) == 0) + { + return NULL; + } + } else +#endif + { + if (snprintf(appendix, sizeof(appendix), "%s", flow->id) <= 0) + { + return NULL; + } + } + if (flow_user->guessed != 0 || flow_user->detected == 0) { int ret = - snprintf(dest, size, "flow-%s-%s.pcap", (flow_user->guessed != 0 ? "guessed" : "undetected"), flow->id); + snprintf(dest, size, "flow-%s-%s.pcap", + (flow_user->guessed != 0 ? "guessed" : "undetected"), + appendix); if (ret <= 0 || (size_t)ret > size) { return NULL; @@ -119,7 +154,12 @@ static int packet_write_pcap_file(UT_array const * const pd_array, int pkt_datal { return 1; } + +#ifdef pcap_dump_open_append + pcap_dumper_t * pd = pcap_dump_open_append(p, filename); +#else pcap_dumper_t * pd = pcap_dump_open(p, filename); +#endif if (pd == NULL) { fprintf(stderr, "pcap error %s\n", pcap_geterr(p)); @@ -258,6 +298,9 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock nDPIsrvd_ull pkt_l4_len = 0ull; perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "pkt_l4_len"), &pkt_l4_len), "pkt_l4_len"); + nDPIsrvd_ull pkt_l4_offset = 0ull; + perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "pkt_l4_offset"), &pkt_l4_offset), "pkt_l4_offset"); + struct packet_data pd = { .packet_ts_sec = pkt_ts_sec, .packet_ts_usec = pkt_ts_usec, @@ -266,31 +309,16 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock .base64_packet_const = pkt->value }; utarray_push_back(flow_user->packets, &pd); - flow_user->flow_total_l4_payload_len += pkt_l4_len - flow_user->flow_l4_header_len; } { struct nDPIsrvd_json_token const * const flow_event_name = TOKEN_GET_SZ(sock, "flow_event_name"); if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "new") != 0) { + flow_user->flow_new_seen = 1; perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "flow_datalink"), &flow_user->flow_datalink), "flow_datalink"); perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "flow_max_packets"), &flow_user->flow_max_packets), "flow_max_packets"); - struct nDPIsrvd_json_token const * const l4_proto = TOKEN_GET_SZ(sock, "l4_proto"); - if (TOKEN_VALUE_EQUALS_SZ(l4_proto, "tcp") != 0) - { - flow_user->flow_l4_header_len = sizeof(struct tcphdr); - } else if (TOKEN_VALUE_EQUALS_SZ(l4_proto, "udp") != 0) - { - flow_user->flow_l4_header_len = sizeof(struct udphdr); - } else if (TOKEN_VALUE_EQUALS_SZ(l4_proto, "icmp") != 0 || - TOKEN_VALUE_EQUALS_SZ(l4_proto, "icmp6") != 0) - { - flow_user->flow_l4_header_len = sizeof(struct icmphdr); - } - - flow_user->flow_new_seen = 1; - return CALLBACK_OK; } else if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "guessed") != 0) { @@ -329,7 +357,6 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock (flow_user->guessed != 0 || flow_user->detected == 0)) { packet_data_print(flow_user->packets); - if (flow_user->flow_total_l4_payload_len > 0) { char pcap_filename[64]; if (generate_pcap_filename(flow, flow_user, pcap_filename, sizeof(pcap_filename)) == NULL) @@ -342,8 +369,6 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock { return CALLBACK_ERROR; } - } else { - printf("flow %s: captured packets do not have any l4 payload\n", flow->id); } utarray_free(flow_user->packets); @@ -376,6 +401,31 @@ static void captured_flow_end_callback(struct nDPIsrvd_socket * const sock, stru } } +// TODO: argv parsing +#if 0 +static int parse_options(int argc, char ** argv) +{ + int opt; + + static char const usage[] = + "Usage: %s " + "[-d] [-s host] [-R rotate-every-n-seconds] [-g] [-u]\n"; + + while ((opt = getopt(argc, argv, "hds:R:gu")) != -1) + { + } + + if (optind < argc) + { + fprintf(stderr, "Unexpected argument after options\n\n"); + fprintf(stderr, usage, argv[0]); + return 1; + } + + return 0; +} +#endif + int main(int argc, char ** argv) { sock = nDPIsrvd_init(0, sizeof(struct flow_user_data), captured_json_callback, captured_flow_end_callback); diff --git a/examples/py-flow-info/flow-info.py b/examples/py-flow-info/flow-info.py index c13e9cead..62e43f74b 100755 --- a/examples/py-flow-info/flow-info.py +++ b/examples/py-flow-info/flow-info.py @@ -7,34 +7,33 @@ sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies') import nDPIsrvd from nDPIsrvd import nDPIsrvdSocket, TermColor - -def parse_json_str(json_str): - - j = nDPIsrvd.JsonParseBytes(json_str[0]) - nDPIdEvent = nDPIsrvd.nDPIdEvent.validateJsonEventTypes(j) - if nDPIdEvent.isValid is False: - raise RuntimeError('Missing event id or event name invalid in the JSON string: {}'.format(j)) - if nDPIdEvent.BasicEventID != -1: - print('{:>21}: {}'.format(TermColor.WARNING + TermColor.BLINK + 'BASIC-EVENT' + TermColor.END, - nDPIdEvent.BasicEventPrettyName)) - return - elif nDPIdEvent.FlowEventID == -1: - return +def prettifyEvent(color_list, whitespaces, text): + term_attrs = str() + for color in color_list: + term_attrs += str(color) + return '{}{:>' + str(whitespaces) + '}{}'.format(term_attrs, text, TermColor.END) + +def onJsonLineRecvd(json_dict, current_flow, global_user_data): + if 'basic_event_id' in json_dict: + print('{}: {}'.format(prettifyEvent([TermColor.WARNING, TermColor.BLINK], 16, 'BASIC-EVENT'), json_dict['basic_event_name'])) + return True + elif 'flow_event_id' not in json_dict: + return True ndpi_proto_categ = '' ndpi_frisk = '' - if 'ndpi' in j: - if 'proto' in j['ndpi']: - ndpi_proto_categ += '[' + str(j['ndpi']['proto']) + ']' + if 'ndpi' in json_dict: + if 'proto' in json_dict['ndpi']: + ndpi_proto_categ += '[' + str(json_dict['ndpi']['proto']) + ']' - if 'category' in j['ndpi']: - ndpi_proto_categ += '[' + str(j['ndpi']['category']) + ']' + if 'category' in json_dict['ndpi']: + ndpi_proto_categ += '[' + str(json_dict['ndpi']['category']) + ']' - if 'flow_risk' in j['ndpi']: + if 'flow_risk' in json_dict['ndpi']: cnt = 0 - for key in j['ndpi']['flow_risk']: - ndpi_frisk += str(j['ndpi']['flow_risk'][key]) + ', ' + for key in json_dict['ndpi']['flow_risk']: + ndpi_frisk += str(json_dict['ndpi']['flow_risk'][key]) + ', ' cnt += 1 ndpi_frisk = '{}: {}'.format( TermColor.WARNING + TermColor.BOLD + 'RISK' + TermColor.END if cnt < 2 @@ -42,39 +41,43 @@ def parse_json_str(json_str): ndpi_frisk[:-2]) instance_and_source = '' - instance_and_source += '[{}]'.format(TermColor.setColorByString(j['alias'])) - instance_and_source += '[{}]'.format(TermColor.setColorByString(j['source'])) + instance_and_source += '[{}]'.format(TermColor.setColorByString(json_dict['alias'])) + instance_and_source += '[{}]'.format(TermColor.setColorByString(json_dict['source'])) + line_suffix = '' flow_event_name = '' - if nDPIdEvent.FlowEventName == 'guessed' or nDPIdEvent.FlowEventName == 'undetected': - flow_event_name += '{}{:>16}{}'.format(TermColor.HINT, nDPIdEvent.FlowEventPrettyName, TermColor.END) + if json_dict['flow_event_name'] == 'guessed' or json_dict['flow_event_name'] == 'not-detected': + flow_event_name += '{}{:>16}{}'.format(TermColor.HINT, json_dict['flow_event_name'], TermColor.END) else: - flow_event_name += '{:>16}'.format(nDPIdEvent.FlowEventPrettyName) + if json_dict['flow_event_name'] == 'new' and json_dict['midstream'] != 0: + line_suffix = '[{}]'.format(TermColor.WARNING + TermColor.BLINK + 'MIDSTREAM' + TermColor.END) + flow_event_name += '{:>16}'.format(json_dict['flow_event_name']) - if j['l3_proto'] == 'ip4': - print('{} {}: [{:.>6}] [{}][{:.>5}] [{:.>15}]{} -> [{:.>15}]{} {}' \ + if json_dict['l3_proto'] == 'ip4': + print('{} {}: [{:.>6}] [{}][{:.>5}] [{:.>15}]{} -> [{:.>15}]{} {}{}' \ ''.format(instance_and_source, flow_event_name, - j['flow_id'], j['l3_proto'], j['l4_proto'], - j['src_ip'].lower(), - '[{:.>5}]'.format(j['src_port']) if 'src_port' in j else '', - j['dst_ip'].lower(), - '[{:.>5}]'.format(j['dst_port']) if 'dst_port' in j else '', - ndpi_proto_categ)) - elif j['l3_proto'] == 'ip6': - print('{} {}: [{:.>6}] [{}][{:.>5}] [{:.>39}]{} -> [{:.>39}]{} {}' \ + json_dict['flow_id'], json_dict['l3_proto'], json_dict['l4_proto'], + json_dict['src_ip'].lower(), + '[{:.>5}]'.format(json_dict['src_port']) if 'src_port' in json_dict else '', + json_dict['dst_ip'].lower(), + '[{:.>5}]'.format(json_dict['dst_port']) if 'dst_port' in json_dict else '', + ndpi_proto_categ, line_suffix)) + elif json_dict['l3_proto'] == 'ip6': + print('{} {}: [{:.>6}] [{}][{:.>5}] [{:.>39}]{} -> [{:.>39}]{} {}{}' \ ''.format(instance_and_source, flow_event_name, - j['flow_id'], j['l3_proto'], j['l4_proto'], - j['src_ip'].lower(), - '[{:.>5}]'.format(j['src_port']) if 'src_port' in j else '', - j['dst_ip'].lower(), - '[{:.>5}]'.format(j['dst_port']) if 'dst_port' in j else '', - ndpi_proto_categ)) + json_dict['flow_id'], json_dict['l3_proto'], json_dict['l4_proto'], + json_dict['src_ip'].lower(), + '[{:.>5}]'.format(json_dict['src_port']) if 'src_port' in json_dict else '', + json_dict['dst_ip'].lower(), + '[{:.>5}]'.format(json_dict['dst_port']) if 'dst_port' in json_dict else '', + ndpi_proto_categ, line_suffix)) else: - raise RuntimeError('unsupported l3 protocol: {}'.format(j['l3_proto'])) + raise RuntimeError('unsupported l3 protocol: {}'.format(json_dict['l3_proto'])) if len(ndpi_frisk) > 0: print('{} {:>18}{}'.format(instance_and_source, '', ndpi_frisk)) + return True if __name__ == '__main__': argparser = nDPIsrvd.defaultArgumentParser() @@ -86,9 +89,4 @@ if __name__ == '__main__': nsock = nDPIsrvdSocket() nsock.connect(address) - - while True: - received = nsock.receive() - for received_json_pkt in received: - parse_json_str(received_json_pkt) - + nsock.loop(onJsonLineRecvd, None) diff --git a/examples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py b/examples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py index f04e4d955..961adc3cf 100755 --- a/examples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py +++ b/examples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 -import base64 import os import sys @@ -8,56 +7,44 @@ sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies') import nDPIsrvd from nDPIsrvd import TermColor, nDPIsrvdSocket, PcapPacket -FLOWS = dict() +def onJsonLineRecvd(json_dict, current_flow, global_user_data): + if current_flow is None: -def parse_json_str(json_str): + if 'packet_event_name' in json_dict and json_dict['packet_event_name'] == 'packet': + fake_flow = Flow() + fake_flow.pkt = PcapPacket() + PcapPacket.handleJSON(json_dict, fake_flow) + fake_flow.pkt.doDump() + fake_flow.pkt.setSuffix('packet_undetected') + fake_flow.pkt.fin() - j = nDPIsrvd.JsonParseBytes(json_str[0]) + return True - global FLOWS + PcapPacket.handleJSON(json_dict, current_flow) - if 'flow_event_name' in j: + if 'flow_event_name' in json_dict and PcapPacket.isInitialized(current_flow) and \ + (json_dict['flow_event_name'] == 'guessed' or json_dict['flow_event_name'] == 'not-detected'): - event = j['flow_event_name'].lower() - flow_id = j['flow_id'] + current_flow.pcap_packet.doDump() + if json_dict['flow_event_name'] == 'guessed': + current_flow.pcap_packet.setSuffix('guessed') - if 'midstream' in j and j['midstream'] == 1: - return + try: + if current_flow.pcap_packet.fin() is True: + print('Guessed flow with id {}, dumped'.format(current_flow.flow_id)) + except RuntimeError as err: + print('Guessed flow with id {} excepted: {}'.format(current_flow.flow_id, str(err))) - if event == 'new': - FLOWS[flow_id] = PcapPacket(flow_id) - elif flow_id not in FLOWS: - return - elif event == 'end' or event == 'idle': - del FLOWS[flow_id] - elif event == 'detected' or event == 'detection-update': - FLOWS[flow_id].detected() - elif event == 'guessed' or event == 'not-detected': - if event == 'guessed': - print('Guessed flow with id {}, PCAP dump returned: {}'.format(flow_id, FLOWS[flow_id].fin('guessed'))) - else: - print('Not-detected flow with id {}: PCAP dump returned {}'.format(flow_id, FLOWS[flow_id].fin('undetected'))) else: - raise RuntimeError('unknown flow event name: {}'.format(event)) + current_flow.pcap_packet.setSuffix('undetected') - elif 'packet_event_name' in j: - - buffer_decoded = base64.b64decode(j['pkt'], validate=True) - - if j['packet_event_name'] == 'packet-flow': - - flow_id = j['flow_id'] - - if flow_id not in FLOWS: - return - - FLOWS[flow_id].addPacket(buffer_decoded, j['pkt_type'], j['pkt_l3_offset']) - - if j['packet_event_name'] == 'packet': - - flow = PcapPacket() - flow.addPacket(buffer_decoded, j['pkt_type'], j['pkt_l3_offset']) + try: + if current_flow.pcap_packet.fin() is True: + print('Not-detected flow with id {}, dumped'.format(current_flow.flow_id)) + except RuntimeError as err: + print('Not-detected flow with id {} excepted: {}'.format(current_flow.flow_id, str(err))) + return True if __name__ == '__main__': argparser = nDPIsrvd.defaultArgumentParser() @@ -69,9 +56,4 @@ if __name__ == '__main__': nsock = nDPIsrvdSocket() nsock.connect(address) - - while True: - received = nsock.receive() - for received_json_pkt in received: - parse_json_str(received_json_pkt) - + nsock.loop(onJsonLineRecvd, None) diff --git a/examples/py-json-stdout/json-stdout.py b/examples/py-json-stdout/json-stdout.py index ac94d707f..9f58d161e 100755 --- a/examples/py-json-stdout/json-stdout.py +++ b/examples/py-json-stdout/json-stdout.py @@ -8,13 +8,9 @@ import nDPIsrvd from nDPIsrvd import nDPIsrvdSocket, TermColor -def parse_json_str(json_str): - - j = nDPIsrvd.JsonParseBytes(json_str[0]) - nDPIdEvent = nDPIsrvd.nDPIdEvent.validateJsonEventTypes(j) - if nDPIdEvent.isValid is False: - raise RuntimeError('Missing event id or event name invalid in the JSON string: {}'.format(j)) - print(j) +def onJsonLineRecvd(json_dict, current_flow, global_user_data): + print(json_dict) + return True if __name__ == '__main__': argparser = nDPIsrvd.defaultArgumentParser() @@ -26,9 +22,4 @@ if __name__ == '__main__': nsock = nDPIsrvdSocket() nsock.connect(address) - - while True: - received = nsock.receive() - for received_json_pkt in received: - parse_json_str(received_json_pkt) - + nsock.loop(onJsonLineRecvd, None) diff --git a/examples/py-risky-flow-to-pcap/risky-flow-to-pcap.py b/examples/py-risky-flow-to-pcap/risky-flow-to-pcap.py index 1a07e2e90..33c0be810 100755 --- a/examples/py-risky-flow-to-pcap/risky-flow-to-pcap.py +++ b/examples/py-risky-flow-to-pcap/risky-flow-to-pcap.py @@ -8,52 +8,32 @@ sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies') import nDPIsrvd from nDPIsrvd import TermColor, nDPIsrvdSocket, PcapPacket -FLOWS = dict() +def onJsonLineRecvd(json_dict, current_flow, global_user_data): + if current_flow is None: + return True -def parse_json_str(json_str): + PcapPacket.handleJSON(json_dict, current_flow) - j = nDPIsrvd.JsonParseBytes(json_str[0]) + if 'flow_event_name' in json_dict and PcapPacket.isInitialized(current_flow) and \ + 'ndpi' in json_dict and 'flow_risk' in json_dict['ndpi'] and not hasattr(current_flow, 'is_risky_flow'): - global FLOWS + current_flow.pcap_packet.doDump() + current_flow.pcap_packet.setSuffix('risky') + current_flow.is_risky_flow = True + print('Risky flow with id {} marked for dumping.'.format(current_flow.flow_id)) - if 'flow_event_name' in j: + if hasattr(current_flow, 'is_risky_flow') and \ + (current_flow.pcap_packet.current_packet < current_flow.pcap_packet.max_packets or \ + ('flow_event_name' in json_dict and \ + (json_dict['flow_event_name'] == 'end' or json_dict['flow_event_name'] == 'idle'))): - event = j['flow_event_name'].lower() - flow_id = j['flow_id'] - - if 'midstream' in j and j['midstream'] == 1: - return - - if event == 'new': - FLOWS[flow_id] = PcapPacket(flow_id) - elif flow_id not in FLOWS: - return - elif event == 'end' or event == 'idle': - del FLOWS[flow_id] - elif event == 'detected' or event == 'detection-update' or event == 'guessed' or event == 'not-detected': - if 'ndpi' in j and 'flow_risk' in j['ndpi']: - print('Risky flow with id {}, PCAP dump returned: {}'.format(flow_id, FLOWS[flow_id].fin('risky'))) - else: - raise RuntimeError('unknown flow event name: {}'.format(event)) - - elif 'packet_event_name' in j: - - buffer_decoded = base64.b64decode(j['pkt'], validate=True) - - if j['packet_event_name'] == 'packet-flow': - - flow_id = j['flow_id'] - - if flow_id not in FLOWS: - return - - FLOWS[flow_id].addPacket(buffer_decoded, j['pkt_type'], j['pkt_l3_offset']) - - if j['packet_event_name'] == 'packet': - - flow = PcapPacket() - flow.addPacket(buffer_decoded, j['pkt_type'], j['pkt_l3_offset']) + try: + if current_flow.pcap_packet.fin() is True: + print('Risky flow with id {} dumped.'.format(current_flow.flow_id)) + except RuntimeError as err: + pass + return True if __name__ == '__main__': argparser = nDPIsrvd.defaultArgumentParser() @@ -65,9 +45,4 @@ if __name__ == '__main__': nsock = nDPIsrvdSocket() nsock.connect(address) - - while True: - received = nsock.receive() - for received_json_pkt in received: - parse_json_str(received_json_pkt) - + nsock.loop(onJsonLineRecvd, None) |