diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2020-08-13 09:49:14 +0200 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2020-08-13 09:49:14 +0200 |
commit | 3f783f9f0155d3fac096caba42365081a7ae8ec4 (patch) | |
tree | 206668e262427fa945b0fd13889af39f77141fc5 | |
parent | dcd206abfd296f8cfacaa63987972e9e6a01ec44 (diff) |
improved TCP-FIN/TCP-RST and TCP-keepalive/-idle timeout handling
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
-rw-r--r-- | config.h | 5 | ||||
-rwxr-xr-x | examples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py | 161 | ||||
-rw-r--r-- | nDPId.c | 30 |
3 files changed, 175 insertions, 21 deletions
@@ -14,8 +14,9 @@ #define nDPId_MAX_IDLE_FLOWS_PER_THREAD 64 #define nDPId_TICK_RESOLUTION 1000 #define nDPId_MAX_READER_THREADS 4 -#define nDPId_IDLE_SCAN_PERIOD 10000 /* msec */ -#define nDPId_MAX_IDLE_TIME 300000 /* msec */ +#define nDPId_IDLE_SCAN_PERIOD 10000 /* 10 sec */ +#define nDPId_MAX_IDLE_TIME 7200000 /* 7200 sec */ +#define nDPId_MAX_POST_END_FLOW_TIME 60000 /* 60 sec */ #define nDPId_INITIAL_THREAD_HASH 0x03dd018b #define nDPId_MAX_PACKETS_PER_FLOW_TO_SEND 15 diff --git a/examples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py b/examples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py new file mode 100755 index 000000000..24e90bf3d --- /dev/null +++ b/examples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python3 + +import base64 +import json +import re +import sys +import socket +import scapy.all + +HOST = '127.0.0.1' +PORT = 7000 +NETWORK_BUFFER_MIN_SIZE = 5 +NETWORK_BUFFER_MAX_SIZE = 8192 + +FLOWS = dict() + +class nDPIsrvdSocket: + def __init__(self, sock=None): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + def connect(self, host, port): + self.sock.connect((host, port)) + self.buffer = bytes() + self.msglen = 0 + self.digitlen = 0 + + def receive(self): + recvd = self.sock.recv(NETWORK_BUFFER_MAX_SIZE - len(self.buffer)) + + if recvd == '': + raise RuntimeError('socket connection broken') + self.buffer += recvd + + retval = [] + while self.msglen + self.digitlen < len(self.buffer): + + if self.msglen == 0: + starts_with_digits = re.match(r'(^\d+){', self.buffer[:NETWORK_BUFFER_MIN_SIZE].decode(errors='strict')) + if starts_with_digits is None: + if len(self.buffer) < NETWORK_BUFFER_MIN_SIZE: + break + raise RuntimeError('Invalid packet received: {}'.format(self.buffer)) + self.msglen = int(starts_with_digits[1]) + self.digitlen = len(starts_with_digits[1]) + + if len(self.buffer) >= self.msglen + self.digitlen: + recvd = self.buffer[self.digitlen:self.msglen + self.digitlen] + self.buffer = self.buffer[self.msglen + self.digitlen:] + retval += [(recvd,self.msglen,self.digitlen)] + + self.msglen = 0 + self.digitlen = 0 + + return retval + +class Flow: + def __init__(self, flow_id=-1): + self.pktdump = None + self.was_detected = False + self.flow_id = flow_id + self.packets = [] + + def addPacket(self, pkt): + self.packets += [pkt] + + def detected(self): + self.was_detected = True + + def fin(self): + if self.was_detected is True: + return + + if self.pktdump is None: + if self.flow_id == -1: + self.pktdump = scapy.all.PcapWriter('packet-undetected.pcap', append=True, sync=True) + else: + self.pktdump = scapy.all.PcapWriter('flow-undetected-{}.pcap'.format(self.flow_id), append=False, sync=True) + + for packet in self.packets: + self.pktdump.write(scapy.all.Raw(packet)) + + self.pktdump.close() + +def parse_json_str(json_str): + + try: + j = json.loads(json_str[0]) + except json.decoder.JSONDecodeError as exc: + raise RuntimeError('JSON Exception: {}\n\nJSON String: {}\n'.format(str(exc), str(json_str))) + + global FLOWS + + if 'flow_event_name' in j: + + event = j['flow_event_name'].lower() + flow_id = j['flow_id'] + + if event == 'new': + print('New flow with id {}.'.format(flow_id)) + FLOWS[flow_id] = Flow(flow_id) + elif flow_id not in FLOWS: + print('Ignore flow event with id {} as we did not get any flow-new event.'.format(flow_id)) + return + elif event == 'end' or event == 'idle': + if event == 'end': + print('End flow with id {}.'.format(flow_id)) + elif event == 'idle': + print('Idle flow with id {}.'.format(flow_id)) + FLOWS[flow_id].fin() + del FLOWS[flow_id] + elif event == 'detected': + FLOWS[flow_id].detected() + elif event == 'guessed' or event == 'not-detected': + if event == 'guessed': + print('Guessed flow with id {}.'.format(flow_id)) + else: + print('Not-detected flow with id {}.'.format(flow_id)) + else: + raise RuntimeError('unknown flow event name: {}'.format(event)) + + elif 'packet_event_name' in j: + + buffer_decoded = base64.b64decode(j['pkt'], validate=True) + + if j['packet_event_name'] == 'packet-flow': + + flow_id = j['flow_id'] + if flow_id not in FLOWS: + print('Ignore packet-flow event with id {} as we did not get any flow-new event.'.format(flow_id)) + return + + FLOWS[flow_id].addPacket(buffer_decoded) + + if j['packet_event_name'] == 'packet': + + flow = Flow() + flow.addPacket(buffer_decoded) + + +if __name__ == '__main__': + host = HOST + port = PORT + + if len(sys.argv) == 1: + sys.stderr.write('usage: {} [host] [port]\n'.format(sys.argv[0])) + if len(sys.argv) > 1: + host = sys.argv[1] + if len(sys.argv) > 2: + port = int(sys.argv[2]) + + sys.stderr.write('Recv buffer size: {}\n'.format(NETWORK_BUFFER_MAX_SIZE)) + sys.stderr.write('Connecting to {}:{} ..\n'.format(host, port)) + + nsock = nDPIsrvdSocket() + nsock.connect(host, port) + + while True: + received = nsock.receive() + for received_json_pkt in received: + parse_json_str(received_json_pkt) + @@ -58,10 +58,9 @@ struct nDPId_flow_info uint16_t dst_port; uint8_t is_midstream_flow : 1; - uint8_t flow_fin_ack_seen : 1; - uint8_t flow_ack_seen : 1; + uint8_t flow_fin_rst_seen : 1; uint8_t detection_completed : 1; - uint8_t reserved_00 : 4; + uint8_t reserved_00 : 5; uint8_t reserved_01[3]; uint8_t l4_protocol; @@ -482,7 +481,7 @@ static void ndpi_idle_scan_walker(void const * const A, ndpi_VISIT which, int de if (which == ndpi_preorder || which == ndpi_leaf) { - if ((flow->flow_fin_ack_seen == 1 && flow->flow_ack_seen == 1) || + if ((flow->flow_fin_rst_seen == 1 && flow->last_seen + nDPId_MAX_POST_END_FLOW_TIME < workflow->last_time) || flow->last_seen + nDPId_MAX_IDLE_TIME < workflow->last_time) { char src_addr_str[INET6_ADDRSTRLEN + 1]; @@ -548,7 +547,11 @@ static void check_for_idle_flows(struct nDPId_reader_thread * const reader_threa jsonize_flow_event(reader_thread, f, FLOW_EVENT_NOT_DETECTED); } } - jsonize_flow_event(reader_thread, f, FLOW_EVENT_IDLE); + if (f->flow_fin_rst_seen != 0) { + jsonize_flow_event(reader_thread, f, FLOW_EVENT_END); + } else { + jsonize_flow_event(reader_thread, f, FLOW_EVENT_IDLE); + } ndpi_tdelete(f, &workflow->ndpi_flows_active[idle_scan_index], ndpi_workflow_node_cmp); ndpi_flow_info_freer(f); workflow->cur_active_flows--; @@ -1349,8 +1352,7 @@ static void ndpi_process_packet(uint8_t * const args, } tcp = (struct ndpi_tcphdr *)l4_ptr; flow.is_midstream_flow = (tcp->syn == 0 ? 1 : 0); - flow.flow_fin_ack_seen = (tcp->fin == 1 && tcp->ack == 1 ? 1 : 0); - flow.flow_ack_seen = tcp->ack; + flow.flow_fin_rst_seen = (tcp->fin == 1 || tcp->rst == 1 ? 1 : 0); flow.src_port = ntohs(tcp->source); flow.dst_port = ntohs(tcp->dest); } @@ -1569,8 +1571,6 @@ static void ndpi_process_packet(uint8_t * const args, flow_to_process->first_seen = time_ms; } flow_to_process->last_seen = time_ms; - /* current packet is an TCP-ACK? */ - flow_to_process->flow_ack_seen = flow.flow_ack_seen; if (l4_len > flow_to_process->max_l4_data_len) { @@ -1591,17 +1591,9 @@ static void ndpi_process_packet(uint8_t * const args, jsonize_packet_event(reader_thread, header, packet, flow_to_process, PACKET_EVENT_PAYLOAD_FLOW); /* TCP-FIN: indicates that at least one side wants to end the connection */ - if (flow.flow_fin_ack_seen != 0 && flow_to_process->flow_fin_ack_seen == 0) + if (flow.flow_fin_rst_seen != 0) { - flow_to_process->flow_fin_ack_seen = 1; - if (flow_to_process->detection_completed == 0) { - if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_to_process->guessed_protocol) != 0) { - jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_GUESSED); - } else { - jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_NOT_DETECTED); - } - } - jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_END); + flow_to_process->flow_fin_rst_seen = 1; return; } |