diff options
author | lns <matzeton@googlemail.com> | 2022-06-13 14:36:21 +0200 |
---|---|---|
committer | lns <matzeton@googlemail.com> | 2022-06-13 14:36:21 +0200 |
commit | 13de32fa34acf2e494af8c9e15aa0b8bb6be0a4a (patch) | |
tree | aacbdc6e7e0682484cc3dc98975e636edd106164 |
Initial commit.
Signed-off-by: lns <matzeton@googlemail.com>
-rw-r--r-- | README.md | 28 | ||||
-rwxr-xr-x | TCPSplit.py | 208 | ||||
-rw-r--r-- | TCPState.py | 984 | ||||
-rw-r--r-- | TCPStream.py | 332 | ||||
-rw-r--r-- | TCPStreamExtractor.py | 237 | ||||
-rw-r--r-- | example.pcapng | bin | 0 -> 2952 bytes |
6 files changed, 1789 insertions, 0 deletions
diff --git a/README.md b/README.md new file mode 100644 index 0000000..c379a1c --- /dev/null +++ b/README.md @@ -0,0 +1,28 @@ +tcp-split +========= + +This project is mostly used for my personal research and testing purposes. + +Split TCP segments of a stream into smaller ones using Scapy and PCAP files. +Inspired and Copy&Paste from [scapy-tcp-extractor](https://github.com/deeso/scapy-tcp-extractor). + +```shell +usage: TCPSplit.py [-h] [-o OUTPUT] [-s] [-l LENGTH] input + +positional arguments: + input PCAP input file + +options: + -h, --help show this help message and exit + -o OUTPUT, --output OUTPUT + PCAP output file + -s, --summary Print found TCP Streams to stdout + -l LENGTH, --length LENGTH + Split TCP payload every n bytes +``` + +You can use the `example.pcapng` which contains two TCP Streams with some ASCII content by typing: + +`./TCPSplit.py -o ./splitted.pcap -l1 -s ./example.pcapng` + +This will print a summary of all found TCP streams and split the TCP segments into segments of 1 byte size. The resulting `splitted.pcap` file should contain valid TCP streams. diff --git a/TCPSplit.py b/TCPSplit.py new file mode 100755 index 0000000..e85a41f --- /dev/null +++ b/TCPSplit.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python3 + +""" +Follow/Split TCP streams with scapy. +""" + +import argparse +import scapy +import scapy.all +import sys +import time + +import TCPStream +import TCPStreamExtractor + + +MAX_BYTES_PER_PACKET = 2 + + +class TCPSplitStreamException(Exception): + + def __init__(self, msg): + super(Exception, self).__init__(msg) + + +class TCPSplitData(object): + + def __init__(self, pkt_timestamp, pkt_direction, pkt_payload): + self.time = pkt_timestamp + self.direction = pkt_direction # False if src->dst, True otherwise + self.payload = pkt_payload + + +class TCPSplitStream(object): + + def __init__(self, tcp_stream: TCPStream.TCPStream): + if not isinstance(tcp_stream, TCPStream.TCPStream): + raise TypeError('Got type ' + str(type(tcp_stream)) + + ', expected ' + str(TCPStream.TCPStream)) + self.stream = tcp_stream + self.ordered_pkts = self.stream.get_order_pkts() + + def __generate_handshake(self): + self.seq = self.ordered_pkts[0][scapy.all.TCP].seq # TCP-SYN + self.ack = self.ordered_pkts[1][scapy.all.TCP].seq # TCP-SYN-ACK + self.ip2dst = scapy.all.IP(src = self.stream.src, dst = self.stream.dst) + self.ip2src = scapy.all.IP(src = self.stream.dst, dst = self.stream.src) + + syn = scapy.all.TCP(sport = self.stream.sport, dport = self.stream.dport, + flags = 'S', seq = self.seq, ack = 0) + self.seq += 1 + synack = scapy.all.TCP(sport = self.stream.dport, dport = self.stream.sport, + flags = 'SA', seq = self.ack, ack = self.seq) + self.ack += 1 + ack = scapy.all.TCP(sport = self.stream.sport, dport = self.stream.dport, + flags = 'A', seq = self.seq, ack = self.ack) + + handshake = list() + handshake += self.ip2dst/syn + handshake += self.ip2src/synack + handshake += self.ip2dst/ack + handshake[0].time = self.ordered_pkts[0].time # time(TCP-SYN) + handshake[1].time = self.ordered_pkts[1].time # time(TCP-SYN-ACK) + handshake[2].time = self.ordered_pkts[2].time # time(TCP-ACK) + return handshake + + def __generate_payload(self, split_data, bytes_per_packet): + packets = list() + + for i in range(0, len(split_data.payload), bytes_per_packet): + payload = split_data.payload[i:i + bytes_per_packet] + if split_data.direction is False: # reverse direction e.g. server -> client? + iphdr_src = self.ip2src + iphdr_dst = self.ip2dst + tcphdr_sport = self.stream.sport + tcphdr_dport = self.stream.dport + tcphdr_seq = self.seq + tcphdr_ack = self.ack + else: + iphdr_src = self.ip2dst + iphdr_dst = self.ip2src + tcphdr_sport = self.stream.dport + tcphdr_dport = self.stream.sport + tcphdr_seq = self.ack + tcphdr_ack = self.seq + + iphdr = iphdr_dst + tcphdr = scapy.all.TCP(sport = tcphdr_sport, dport = tcphdr_dport, + flags = 'PA', seq = tcphdr_seq, ack = tcphdr_ack) + p = iphdr/tcphdr/scapy.all.Raw(load = payload) + p.time = split_data.time + packets += p + tcphdr_seq += len(payload) + + iphdr = iphdr_src + tcphdr = scapy.all.TCP(sport = tcphdr_dport, dport = tcphdr_sport, + flags = 'A', seq = tcphdr_ack, ack = tcphdr_seq) + p = iphdr/tcphdr + p.time = split_data.time + packets += p + + if split_data.direction is False: + self.seq = tcphdr_seq + self.ack = tcphdr_ack + else: + self.seq = tcphdr_ack + self.ack = tcphdr_seq + + return packets + + def __generate_finish(self, split_data): + packets = list() + + # We are always closing the connection from the client side for now. + iphdr = self.ip2dst + tcphdr = scapy.all.TCP(sport = self.stream.sport, dport = self.stream.dport, + flags = 'FA', seq = self.seq, ack = self.ack) + p = iphdr/tcphdr + p.time = split_data.time + packets += p + self.seq += 1 + + iphdr = self.ip2src + tcphdr = scapy.all.TCP(sport = self.stream.dport, dport = self.stream.sport, + flags = 'FA', seq = self.ack, ack = self.seq) + p = iphdr/tcphdr + p.time = split_data.time + packets += p + self.ack += 1 + + iphdr = self.ip2dst + tcphdr = scapy.all.TCP(sport = self.stream.sport, dport = self.stream.dport, + flags = 'A', seq = self.seq, ack = self.ack) + p = iphdr/tcphdr + p.time = split_data.time + packets += p + + return packets + + @staticmethod + def build_stream_key(pkt): + return str(pkt['IP'].src) + ':' + str(pkt['TCP'].sport) + \ + ' -> ' + \ + str(pkt['IP'].dst) + ':' + str(pkt['TCP'].dport) + + def split(self): + data = [] + seq_nos = {} + for pkt in self.ordered_pkts: + tcpp = pkt[scapy.all.TCP] + seq = tcpp.seq + key = TCPSplitStream.build_stream_key(pkt) + if not key in seq_nos: + seq_nos[key] = [] + + if seq in seq_nos[key]: + # retransmit + continue + + if scapy.all.Raw in tcpp: + data.append(TCPSplitData(pkt.time, False if pkt.sport == self.stream.sport else True, + tcpp[scapy.all.Raw].load)) + seq_nos[key].append(seq) + + tcp_stream = self.__generate_handshake() + + data.sort(key = lambda tcp_split_data: tcp_split_data.time) + for payload_packet in data: + tcp_stream += self.__generate_payload(payload_packet, MAX_BYTES_PER_PACKET) + + tcp_stream += self.__generate_finish(data[-1:][0]) + + return tcp_stream + + +def printStreams(tse): + print('TCP Streams found:') + for stream in tse.summary(): + print('\t' + stream) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('input', type=str, help='PCAP input file') + parser.add_argument('-o', '--output', type=str, help='PCAP output file', + default=None) + parser.add_argument('-s', '--summary', action="store_true", + help='Print found TCP Streams to stdout', + default=False) + parser.add_argument('-l', '--length', type=int, + help='Split TCP payload every n bytes', + default=MAX_BYTES_PER_PACKET) + args = parser.parse_args() + + tse = TCPStreamExtractor.TCPStreamExtractor(args.input) + if args.summary is True: + printStreams(tse) + + MAX_BYTES_PER_PACKET = args.length + + all_streams = list() + for session in tse.fwd_flows: + stream = tse.streams[session] + tss = TCPSplitStream(stream) + all_streams += tss.split() + + if args.output is not None: + scapy.all.wrpcap(args.output, all_streams) diff --git a/TCPState.py b/TCPState.py new file mode 100644 index 0000000..2c936d9 --- /dev/null +++ b/TCPState.py @@ -0,0 +1,984 @@ +""" +TCP stream extraction using Scapy. + +(c) Praetorian +Author: Adam Pridgen <adam.pridgen@praetorian.com> || <adam.pridgen@thecoverofnight.com> + +This program is free software; you can redistribute it and/or modify it +under the terms of the GNU General Public License as published by the Free +Software Foundation; either version 3, or (at your option) any later +version. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +more details. + +You should have received a copy of the GNU General Public License along +with this program; see the file COPYING. If not, write to the Free +Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +02110-1301, USA. + +Description: tracks TCPStream state between a client and server + +""" +from scapy.all import * +from random import randint + + +is_syn_pkt = lambda pkt: 'TCP' in pkt and pkt['TCP'].flags == TCP_FLAGS['S'] +is_synack_pkt = lambda pkt: 'TCP' in pkt and pkt['TCP'].flags == (TCP_FLAGS['S'] | TCP_FLAGS['A']) + +create_pkt_flow = lambda pkt: "%s:%s ==> %s:%s"%(pkt['IP'].src,str(pkt['IP'].sport),pkt['IP'].dst,str(pkt['IP'].dport)) + +create_forward_flow = lambda pkt: "%s:%s ==> %s:%s"%(pkt['IP'].src,str(pkt['IP'].sport),pkt['IP'].dst,str(pkt['IP'].dport)) + +create_reverse_flow = lambda pkt: "%s:%s ==> %s:%s"%(pkt['IP'].dst,str(pkt['IP'].dport),pkt['IP'].src,str(pkt['IP'].sport)) + + +create_flow = create_forward_flow + + +TCP_FLAGS = {"F":0x1, "S":0x2, "R":0x4, "P":0x8, + "A":0x10, "U":0x20, "E":0x40, "C":0x80, + 0x1:"F", 0x2:"S", 0x4:"R", 0x8:"P", + 0x10:"A", 0x20:"U", 0x40:"E", 0x80:"C"} + +TCP_STATES = {"LISTEN":{'S':["SYN_RCVD", 'SA']}, + "SYN_SENT":{'SA':["ESTABLISHED", 'A'],'S':["SYN_RCVD", 'SA'],}, + "SYN_RCVD":{'F':["FIN_WAIT_1", 'A'],'A':["ESTABLISHED", ''],'R':["LISTEN", ''],}, + "LAST_ACK":{}, + "CLOSE_WAIT":{"":["LAST_ACK","F"]}, # initiated by the server + "LAST_ACK":{"A":["CLOSED",""]}, + "ESTABLISHED":{"F":["FIN_WAIT_1",""],}, + "FIN_WAIT_1":{"A":["FIN_WAIT_2",""],"F":["CLOSED","A"],"FA":["TIME_WAIT","A"],}, + "FIN_WAIT_2":{"F":["TIME_WAIT","A"],}, + "CLOSED":{"A":["TIME_WAIT", ""]},} + + +flags_equal = lambda pkt, flag: pkt['TCP'].flags == flag +flags_set = lambda pkt, flag: (pkt['TCP'].flags & flag) != 0 + +class TCPStateMachine: + def __init__(self, pkt=None): + if not pkt is None: + self.init(pkt) + + def init(self, pkt): + if not 'TCP' in pkt: + raise Exception("Not a TCP Packet") + if not is_syn_pkt(pkt): + raise Exception("Not valid SYN") + + self.flows = set((create_forward_flow(pkt), create_reverse_flow(pkt))) + self.server = pkt['IP'].dst + self.client = pkt['IP'].src + + # 0 is now, 1 is the future Flags + self.server_state = "LISTEN" + self.client_state = "SYN_SENT" + + self.server_close_time = -1.0 + self.client_close_time = -1.0 + self.fin_wait_time = -1.0 + + + + def next_state(self, pkt): + if not 'TCP' in pkt: + raise Exception("Not a TCP Packet") + + # determine in what context we are handling this packet + flow = create_flow(pkt) + if flow not in self.flows: + raise Exception("Not a valid packet for this model") + + if pkt['IP'].dst == self.server: + v = self.handle_client_pkt(pkt) + if self.is_fin_wait(): + self.fin_wait_time = pkt.time + return v + else: + v = self.handle_server_pkt(pkt) + if self.is_fin_wait(): + self.fin_wait_time = pkt.time + return v + + + raise Exception("Not a valid packet for this model") + + + def get_states(self): + return (self.client_state, self.server_state) + + + def build_flags(self, sflags): + return sum([TCP_FLAGS[i] for i in sflags]) + + + def active_close(self): + return (self.client_state == self.server_state and self.server_state == "CLOSED") + + def passive_close(self): + return (self.client_state == "LAST_ACK" and self.server_state == "CLOSE_WAIT") + + def is_established(self): + return (self.client_state == self.server_state and self.server_state == "ESTABLISHED") + + def client_prehandshake(self): + return (self.client_state == "SYN_SENT") or (self.client_state == "SYN_RCVD") + + def server_prehandshake(self): + return (self.server_state == "SYN_SENT") or (self.server_state == "SYN_RCVD") or (self.server_state == "LISTEN") + + def is_fin_wait(self): + return self.client_state.find("FIN_WAIT") > -1 or self.server_state.find("FIN_WAIT") > -1 + def is_prehandshake(self): + return self.client_prehandshake() and self.server_prehandshake() + + def is_closed(self): + return self.passive_close() or self.active_close() + + def handle_client_pkt(self, pkt): + flags = pkt['TCP'].flags + client_got_closed = False + server_got_closed = False + + if flags == self.build_flags("R"): + self.client_state = "CLOSED" + self.server_state = "CLOSED" + server_got_closed = True + client_got_closed = True + + elif flags == self.build_flags("RA"): + self.client_state = "CLOSED" + self.server_state = "CLOSED" + server_got_closed = True + client_got_closed = True + elif flags == self.build_flags("S"): + self.server_state = "SYN_SENT" + + elif self.client_state == "SYN_SENT": + if flags & self.build_flags("A") > 0: + self.client_state = "ESTABLISHED" + self.server_state = "ESTABLISHED" + else: + self.client_state = "CLOSED" + server_got_closed = pkt.time + client_got_closed = pkt.time + return self.is_closed() + + elif self.client_state == "SYN_SENT": + if flags & self.build_flags("SA") > 0: + self.client_state = "SYN_RCVD" + + elif self.client_state == "SYN_RECVD" and\ + flags & self.build_flags("F") > 0: + self.client_state = "FIN_WAIT_1" + + elif self.client_state == "ESTABLISHED" and\ + flags == self.build_flags("FA"): + self.client_state = "FIN_WAIT_1" + + elif self.client_state == "FIN_WAIT_1" and\ + flags == self.build_flags("A"): + self.client_state = "CLOSED" + + elif self.client_state == "ESTABLISHED" and\ + self.server_state == "CLOSE_WAIT" and\ + flags & self.build_flags("A") > 0: + self.client_state = "CLOSED" + + if self.server_state == "FIN_WAIT_1" and\ + self.client_state == "CLOSED" and\ + flags == self.build_flags("A"): + self.server_state = "CLOSED" + server_got_closed = True + client_got_closed = True + + if client_got_closed: + self.client_close_timed = pkt.time + if server_got_closed: + self.server_close_timed = pkt.time + + return self.is_closed() + + def handle_server_pkt(self, pkt): + flags = pkt['TCP'].flags + server_got_closed = False + client_got_closed = False + + if flags == self.build_flags("R"): + self.client_state = "CLOSED" + self.server_state = "CLOSED" + server_got_closed = True + client_got_closed = True + + elif flags == self.build_flags("RA"): + self.client_state = "CLOSED" + self.server_state = "CLOSED" + server_got_closed = True + client_got_closed = True + + elif flags == self.build_flags("S"): + self.server_state = "SYN_SENT" + elif self.server_state == "LISTEN" and\ + flags == self.build_flags("SA"): + self.server_state = "SYN_RCVD" + + elif self.server_state == "ESTABLISHED" and\ + flags == self.build_flags("FA"): + self.server_state = "FIN_WAIT_1" + + elif self.server_state == "FIN_WAIT_1" and\ + flags == self.build_flags("A"): + self.server_state = "CLOSED" + server_got_closed = True + + + elif self.server_state == "SYN_RCVD" and\ + flags == self.build_flags("F"): + self.server_state = "FIN_WAIT_1" + + elif self.server_state == "FIN_WAIT_1" and\ + flags == self.build_flags("FA"): + self.server_state = "CLOSED" + + elif self.server_state == "SYN_RCVD" and\ + flags == self.build_flags("A"): + self.server_state = "ESTABLISHED" + + elif self.server_state == "ESTABLISHED" and\ + flags & self.build_flags("F") > 0: + self.server_state = "CLOSE_WAIT" + + elif self.client_state == "FIN_WAIT_1" and\ + flags == self.build_flags("FA"): + self.server_state = "CLOSED" + server_got_closed = True + + elif self.client_state == "CLOSED" and\ + flags == self.build_flags("A"): + self.server_state = "CLOSED" + server_got_closed = True + + if self.client_state == "FIN_WAIT_1" and\ + self.server_state == "CLOSED" and\ + flags == self.build_flags("A"): + self.client_state = "CLOSED" + client_got_closed = True + + if client_got_closed: + self.client_close_timed = pkt.time + if server_got_closed: + self.server_close_timed = pkt.time + + return self.is_closed() + + + +#@conf.commands.register +class TCPState: + ''' + Basic implementation of the TCP State Machine (SM) that + is meant to work with scapy. + + Reference RFC 793. TCP not fully implemented. This installment gives + no attention to timers, congestion windows, etc. This is a basic protocol + implementation so that we can talk to our estranged partner host. + ''' + + def __init__(self): + # RCV should actually be initialized when the + # 3-way hand shake takes place + irs = randint(0, 0x0FFFFFFFF) + iss = randint(0, 0x0FFFFFFFF) + print(("Initializing the SND with %x and RCV with %x" % (iss,irs))) + self.SND = Snd() + self.RCV = Rcv() + self.seg_record = []# keep record of session + self.una_segs = [] # maintain list of un-acked segs + self.previous_payload = None # for ans TCP data send + self.state = "CLOSED" + self.sock = None + self.move_state = self.state_closed + # TCP segment info + self.sport = randint(0, 0x0FFFFFFFF) + self.dport = randint(0, 0x0FFFFFFFF) + self.dst = 'localhost' + + def get_socket(self, s): + if not s is None: + self.sock = s + s = None + elif s is None and self.sock is None: + self.sock = self.init_socket() + + def get_pkt(self, pkt): + p = pkt + if p is None: + p = self.get_base_pkt() + return p + + def add_ether(self, pkt): + if Ether not in pkt: + return Ether() / pkt + return pkt + + def check_flags(self, seg, flag_str): + ''' + Compare segment flag values to a flag string. + + :param seg: segment to compare flag values + :type seg: scapy.TCP + :param flag_str: flag string to compare against the + :type flag_str: string + segment + :return: flag values match + :rtype: boolean + ''' + return self.get_flag_val(flag_str) == seg.flags + + def init_from_pkt(self, seg): + ''' + Initialize the TCP SM based on a TCP segment. + + :param seg: segment to initialize SM from + :type seg: scapy.TCP + ''' + self.RCV.init_from_seg(seg) + self.SND.init_from_seg(seg) + + def build_basic_pkt(self, dst, dport, sport=None): + if sport is None: + sport = randint(0, 65535) + self.sport = sport + self.dport = dport + self.dst = dst + return IP(dst=dst) / TCP(dport=dport, sport=sport) + + def get_rbase_tcp(self, rseg): + ''' + Creates a base TCP segment based on a rcvd segment. + + :param rseg: rcvd segment to base a new segment off of + :type rseg: scapy.TCP + ''' + sport = rseg.dport + dport = rseg.sport + options = rseg.options + return TCP(sport=sport, dport=dport, options=options) + + def get_rbase_ip(self, rpkt): + ''' + Creates a base IP packet based on a rcvd segment. + + :param rpkt: rcvd IP packet to base a new packet off of + :type rpkt: scapy.IP + ''' + dst = rpkt.src + src = rpkt.dst + options = rpkt.options + return IP(src=src, dst=dst, options=options) + + def get_rbase_pkt(self, rpkt): + ''' + Creates a base packet based on a rcvd packet. + + :param rpkt: rcvd segment to base a new packet off of + :type rpkt: scapy.IP/scapy.TCP + ''' + return IP(dst=rpkt[IP].src) / TCP(dport=rpkt[TCP].sport, sport=rpkt[TCP].dport) + + def get_base_tcp(self): + ''' + Creates a base TCP segment based on a defined internal TCP parameters + segment. + ''' + sport = self.sport + dport = self.dport + return TCP(sport=sport, dport=dport) + + def get_base_ip(self): + ''' + Creates a base IP packet based on internal TCP/IP stuffs. + ''' + dst = self.dst + return IP(dst=dst) + + def get_base_pkt(self): + ''' + Creates a base packet based on a rcvd packet. + ''' + return IP(dst=self.dst) / TCP(dport=self.dport,sport=self.sport) + + + def update_seg_state(self, seg, payload=None): + ''' + Update the state of a segment based on the TCP state. + + :param seg: segment to update the ack and seq numbers for + :type seg: scapy.TCP + ''' + seg = self.RCV.update_seg(seg)[0] + seg, pay = self.SND.update_seg(seg, payload) + return seg, pay + + def get_flag_val(self, flag_str): + ''' + Get flag values based on flag string. + + :param flag_str: flag string to convert to int + :type flag_str: string + + :return: integer representation of the flag string + :rtype: integer + ''' + flags = 0 + for i in flag_str: + flags += TCP_FLAGS[i] + return flags + + def check_pkt(self, pkt): + ''' + Check to see if the pkt contains a TCP segment. + + :param pkt: packet that may or may not contain a pkt + :type pkt: scapy.Packet + + :return: TCP payload is in the packet + :rtype: boolean + ''' + return not pkt is None and TCP in pkt + + def update_from_pkt(self, pkt): + ''' + Update TCP state from the given packet. + + :param pkt: packet that is used to update TCP state + :type pkt: scapy.Packet + + :return: successful update + :rtype: boolean + ''' + if self.check_pkt(pkt): + seg = pkt[TCP] + x = self.update_snd(seg) + y = self.update_rcv(seg) + return x and y + return False + + def update_snd(self, seg): + ''' + Update the SND (seq numbers and such) portion of the TCP SM. + + :param seg: TCP segment + :type seg: scapy.TCP + + :return: successful update + :rtype: boolean + ''' + return self.SND.update_from_seg(seg) + + def update_rcv(self, seg): + ''' + Update the RCV (rcv numbers and such) portion of the TCP SM. + + :param seg: TCP segment + :type seg: scapy.TCP + + :return: successful update + :rtype: boolean + ''' + return self.RCV.update_from_seg(seg) + + # handle send syn stuff + def create_seg(self, seg=None, flags="S", payload=None ): + ''' + Create a segment based on the TCP SM, flags, and payload. + + :param seg: TCP segment + :type seg: scapy.TCP + :param flags: flags string to set in the segment + :type flags: string + :param payload: payload to include in the segment + :type payload: string + + :return: tuple of the TCP segment and unused payload + + :rtype: (scapy.TCP, string) + ''' + s = self.get_pkt(seg) + seg = None + pay = payload + payload = None + s, pay = self.update_seg_state(s, pay) + s.flags = self.get_flag_val(flags) + return s, pay + + def rcv_syn(self, rpkt): + ''' + Update TCP SM based on rcv'd syn packet. + + :param rpkt: IP/TCP pkt + :type rpkt: scapy.Packet + ''' + self.dport = seg.sport + self.sport = seg.dport + self.dst = seg.src + + # init tcp state + self.RCV.init_from_seg(rpkt[TCP]) + self.state = "SYN_RCVD" + + def rcv_syn_ans(self, rpkt, s=None): + ''' + Update TCP SM based on rcv'd a syn packet and + respond automatically. + + :param rpkt: IP/TCP pkt + :type rpkt: scapy.Packet + :param s: socket to send packet out on + :type s: scapy.L3Socket + ''' + self.get_socket(s) + s = None + + self.rcv_syn(rpkt) + # get IP and TCP vals + pkt = self.get_base_pkt() + self.state = "SYN_RCVD" + self.move_state = self.state_synrcvd + return self.send_pkt(pkt, self.sock, flags="SA") + + def rcv_synack(self, rpkt): + ''' + Update TCP SM based on rcv'd a syn-ack packet. + + :param rpkt: IP/TCP pkt + :type rpkt: scapy.Packet + ''' + if self.check_pkt(rpkt): + self.init_from_pkt(rpkt[TCP]) + + def rcv_synack_ans(self, rpkt, s=None): + ''' + Update TCP SM based on rcv'd a syn-ack packet and + respond automatically. + + :param rpkt: IP/TCP pkt + :type rpkt: scapy.Packet + :param s: socket to send packet out on + :type s: scapy.L3Socket + ''' + self.get_socket(s) + s = None + + self.rcv_synack(rpkt) + pkt = self.get_base_pkt() + print ("Inside syn-ack ans machine") + #rpkt.show() + #pkt.show() + return self.send_pkt(pkt, s, flags="A") + + def send_pkt(self, pkt=None, s=None, flags=None, payload=None): + ''' + Update TCP Segment and Send the full packet. + + :param s: socket to send packet out on + :type s: scapy.L3Socket + :param pkt: IP/TCP pkt + :type pkt: scapy.Packet + :param flags: flags to set in the segment + :type flags: string + :param payload: payload to include in the packet + :type payload: string + + :return: packet received from sending the pkt + + :rtype: scapy.Packet + ''' + p = self.get_pkt(pkt) + self.get_socket(s) + s = pkt = None + + #pkt = self.add_ether(pkt) + p[TCP],pay = self.create_seg(p[TCP], flags=flags,payload=payload) + rpkt = self.send_rcv_pkts(self.sock, p) + if rpkt is None or not TCP in rpkt: + return None, pay + return rpkt, pay + + def rcv_fin(self, pkt): + self.update_from_pkt(pkt) + + def rcv_fin_ans(self, rpkt, s=None): + # skip over FIN_WAIT_* phases and + # LAST_ACK states + if rpkt is None or\ + not TCP in None: + return None + self.rcv_fin(rpkt) + if rpkt[TCP].flags == self.get_flag_val("F") or\ + rpkt[TCP].flags == self.get_flag_val("FA") and\ + self.state == "ESTABLISHED": + self.state = "CLOSED" + return self.send_pkt( s=s, flags="FA") + elif rpkt[TCP].flags == self.get_flag_val("F") or\ + rpkt[TCP].flags == self.get_flag_val("FA") and\ + self.state == "FIN_WAIT_1": + self.state = "CLOSED" + return self.send_pkt( s=s, flags="A") + return (None, None) + + def rcv_seg_ans(self, rpkt, s): + if rpkt is None or\ + not TCP in rpkt: + return None + rflags = rpkt[TCP].flags + if rflags == self.get_flag_val("S"): + return self.rcv_syn_ans(rpkt, s) + elif rflags == self.get_flag_val("A"): + # TODO this is only an ACK and + # ot could mean a number of things + # this can not be answered automatically + # yet + return self.rcv_ack_ans(rpkt, s) + elif rflags == self.get_flag_val("F") or\ + rflags == self.get_flag_val("FA"): + return self.rcv_fin_ans(rpkt, s) + elif rflags == self.get_flag_val("SA"): + return self.rcv_synack_ans(rpkt, s) + elif rflags == self.get_flag_val("PA"): + return self.rcv_pshack_ans(rpkt, s) + + def rcv_pshack_ans(self, rpkt, s=None): + if rpkt is None or\ + not TCP in None: + return None + + self.update_from_pkt(rpkt) + + def rcv_ack(self, rpkt): + ''' + Update TCP SM based on rcv'd a ack packet. + + :param rpkt: IP/TCP pkt + :type rpkt: scapy.Packet + ''' + self.update_from_pkt(rpkt) + + def rcv_ack_ans(self, rpkt, s=None): + self.rcv_ack(rpkt) + return None + + def send_rcv_pkts(self, s, pkt): + ''' + Send and recv packets. + + :param s: socket to send packet out on + :type s: scapy.L3Socket + :param pkt: IP/TCP pkt + :type pkt: scapy.Packet + + :return: packet recieved from sending the pkt + + :rtype: scapy.Packet + ''' + result = self.quick_send(s, pkt) + if len(result[0]) == 0: + self.seg_record.append((pkt, None)) + return None + rpkt = result[0][0][1] + self.seg_record.append((pkt, rpkt)) + return rpkt + + # TCP state transitioning takes place here + def state_closed(self, rpkt): + self.state == "CLOSED" + return self.state + + def state_listen(self, rpkt, s=None): + if TCP in rpkt and\ + rpkt[TCP].flags == self.get_flag_val("S"): + self.state = "SYN_RCVD" + self.move_state = self.state_syn_rcvd + self.rcv_syn(rpkt) + return self.state + return self.state + + def state_syn_rcvd(self, rpkt, s=None): + if TCP in rpkt and\ + rpkt[TCP].flags == self.get_flag_val("A"): + self.state = "ESTABLISHED" + self.move_state = self.state_established(rpkt, s) + self.rcv_ack(rpkt, s) + #self.send_synack(pkt, s) + + def state_syn_sent(self, rpkt, s=None): + if TCP in rpkt and\ + rpkt[TCP].flags == self.get_flag_val("A"): + self.state = "ESTABLISHED" + self.move_state = self.state_established + return self.rcv_synack_ans(rpkt, s) + + def state_established(self, rpkt, s=None): + if not TCP in rpkt: + return None + + if rpkt[TCP].flags == self.get_flag_val("A"): + return self.rcv_ack(rpkt) + elif rpkt[TCP].flags == self.get_flag_val("PA"): + return self.rcv_pshack(seg) + elif rpkt[TCP].flags == self.get_flag_val("RA"): + pass + #return self.rcv_ack(seg) + elif rpkt[TCP].flags == self.get_flag_val("F"): + # TODO implement rcv_fin + self.state = "CLOSE_WAIT" + self.move_state = self.state_close_wait + # do not care about the return value for the + # ack of the fin, since the socket will close + # on the remote end + rpkt2, pay= self.send_pkt(self.get_base_pkt(), s=s,flags="A") + return self.move_state(rpkt) + #return self.rcv_fin_ans(seg) + elif rpkt[TCP].flags == self.get_flag_val("FA"): + return self.rcv_finack(seg) + + def state_close_wait(self, rpkt, s=None): + if self.state == "CLOSE_WAIT": + self.state = "LAST_ACK" + self.move_state = self.state_last_ack + rpkt, pay = self.send_pkt(rpkt, s, flags="F") + return self.move_state(rpkt, s) + + def state_last_ack(self, rpkt, s=None): + if self.state == "LAST_ACK" and\ + TCP in rpkt and\ + rpkt[TCP].flags == self.get_flag_val("A"): + self.state = "CLOSED" + self.move_state = self.state_closed + return False + + def state_closing(self, rpkt, s=None): + if self.state == "CLOSING" and\ + TCP in rpkt and\ + rpkt[TCP].flags == self.get_flag_val("A"): + self.state == "TIME_WAIT" + self.move_state = self.state_time_wait + return True + return False + + def state_time_wait(self, rpkt, s=None): + if self.state == "TIME_WAIT": + # dont care about cheking rpkt from for an ack + # from the fin in the closing state + self.state = "CLOSED" + self.move_state = self.state_closed + return True + return False + + def state_fin_wait_1(self, rpkt, s=None): + if self.state != "FIN_WAIT_1": + return False + if TCP in rpkt and\ + rpkt[TCP].flage == self.get_flag_val("F"): + self.state = "FIN_WAIT_2" + self.move_state = self.state_fin_wait_2 + return self.move_state(rpkt) + + if TCP in rpkt and\ + rpkt[TCP].flags == self.get_flag_val("F"): + # TODO implement rcv_fin_ans + #rpkt = self.rcv_fin(rpkt, s) + self.move_state = self.state_closing + self.state = "CLOSING" + return self.move_state(rpkt, s) + + def state_fin_wait_1(self, rpkt, s=None): + if self.state == "FIN_WAIT_1" and\ + TCP in rpkt and\ + rpkt[TCP].flage == self.get_flag_val("A"): + self.state = "TIME_WAIT" + self.move_state = self.state_fin_wait_2 + return self.move_state(rpkt) + + def rcv_seg(self, rpkt): + self.move_state(rpkt) + + def establish_connection(self, pkt, s=None): + ''' + Send and recv packets. + + :param pkt: IP/TCP pkt + :type pkt: scapy.Packet + :param s: socket to send packet out on + :type s: scapy.L3Socket + + :return: successful connection established, + packet received from sending the pkt + :rtype: boolean, scapy.Packet + ''' + print ("Preparing to establish a TCP Connection..") + self.get_socket(s) + s = None + print ("Prepping and Sending Syn Segment") + rpkt, pay = self.send_pkt(pkt, self.sock, flags="S") + + if rpkt is None or\ + not self.check_flags(rpkt[TCP], "SA"): + return False, rpkt + self.state = "SYN_SENT" + rpkt = self.rcv_synack_ans(rpkt, s) + return True, rpkt + + def listen(self, lport, s=None, timeout=None): + """ + Listen for a connection attempt. + + :param lport: port to look for in the syn packet + :type lport: port to listen for + :param s: scapy socket to listen on, if none one is initialized + :type s: scapy.L2Socket + :param timeout: stop sniffing after a given time (default: None) + :type timeout: length of time to listen for + + :return: successful connection established, + packet received from sending the pkt + + :rtype: boolean, scapy.Packet + """ + print ("Preparing to listen for a TCP Connection..") + self.get_socket(s) + s = None + + print ("Listening for a connection request") + rpkt = self.listen_for_syn(lport, timeout=timeout) + rpkt = self.rcv_syn_ans(rpkt) + if not rpkt is None: + return True, rpkt + return False, rpkt + + def simple_send_data(self, seg, payload=None): + """ + Send data, payload, to the remote host using the TCP state machine. + The data is contained in payload, and any payload that can not be sent + is returned back to the user. + + :param seg: seg contains the data payload + :type seg: scapy.TCP + :param payload: seg data to send + :type payload: string + + :return: successfully sent all data, unsent data + :rtype: (boolean, string) + """ + p = "" + success = False + if not payload is None: + p = payload + payload = None + elif payload is None and\ + not seg.payload is None: + p = str(seg.payload) + seg.payload = None + + while 1: + seg, p = self.SND.update_seg(seg, p) + if seg is None: + success = False + break + + return success, p + + def flush_rcv_socket(self, sock): + ''' + Flush out all the packets from a socket. + + :param sock: socket to read all data out of + :type sock: scapy.SuperSocket + + :return: list of all the packets read out of the socket + :rtype: list + ''' + + pkts = [] + while 1: + pkt = sock.recv(MTU) + if pkt is None: break + pkts.append(pkt) + return pkts + + + def listen_for_syn(self, lport, s=None, timeout=None, sel_timeout=.1): + """ + Listen for a Syn Packet (based on sniff). + + :param lport: port to look for in the syn packet + :type lport: port to listen for + :param s: scapy socket to listen on, if none one is initialized + :type s: scapy.L3Socket + :param timeout: stop sniffing after a given time (default: None) + :type timeout: int + :param sel_timeout: select timeout period + :type sel_timeout: int + """ + self.get_socket(s) + s = None + + syn_filter = lambda pkt: not pkt is None and\ + TCP in pkt and\ + pkt[TCP].flags == self.get_flag_val("S") and\ + pkt[TCP].dport== lport + + if timeout is not None: + stoptime = time.time()+timeout + remain = None + pkts = [] + p = self.flush_rcv_socket(self.sock) + while 1: + try: + if timeout is not None: + remain = stoptime-time.time() + if remain <= 0: + break + sel = select([self.sock],[],[], .1) + if not sel[0] is None: + p = self.sock.recv(MTU) + if p is None: + continue + if syn_filter(p): + return p + except KeyboardInterrupt: + break + return None + + def quick_send(self, sock, pkt, timeout=4, inter=0, verbose=None,chainCC=0, retry=0, multi=0): + ''' + Quick send is just a wrapper around scapy sndrcv(...) + Check the code or docs for keywords and other stuff, but we + simply pass in a packet and a socket. + + :param sock: initialized socket for sending packet data + :type sock: scapy.L3socket + :param pkt: packet to send + :type pkt: scapy.Packet + ''' + return sndrcv(sock, pkt, timeout, inter, verbose, chainCC, retry, multi) + + def init_socket(self, iface=None, filter=None, nofilter=0): + print ("Initializing Socket") + return self.init_L3socket(filter=filter, nofilter=nofilter,iface=iface) + + def init_L3socket(self, iface=None, filter=None, nofilter=0): + print ("Initializing Socket") + self.sock = conf.L3socket(filter=filter, nofilter=nofilter,iface=iface) + print(("The following socket was initialized", str(socket))) + return self.sock + + def init_L2socket(self, iface=None, filter=None, nofilter=0): + print ("Initializing Socket") + self.sock = conf.L2socket(filter=filter, nofilter=nofilter,iface=iface) + print(("The following socket was initialized", str(socket))) + return self.sock diff --git a/TCPStream.py b/TCPStream.py new file mode 100644 index 0000000..e460b6c --- /dev/null +++ b/TCPStream.py @@ -0,0 +1,332 @@ +""" +TCP stream extraction using Scapy. + +(c) Praetorian +Author: Adam Pridgen <adam.pridgen@praetorian.com> || <adam.pridgen@thecoverofnight.com> + + +This program is free software; you can redistribute it and/or modify it +under the terms of the GNU General Public License as published by the Free +Software Foundation; either version 3, or (at your option) any later +version. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +more details. + +You should have received a copy of the GNU General Public License along +with this program; see the file COPYING. If not, write to the Free +Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +02110-1301, USA. + +description: keeps track of the packets belonging to a specific tcp stream + +""" + +from scapy.all import * +from random import randint +import scapy + +from random import randint +from TCPState import TCPStateMachine + + +from threading import * + +HEADER = "Time Flow_with_Direction PktPayload ClientDataTransfered ServerDataTransfered TotalData TimeBetweenLastPkt TimeBetweenLastClientPacket TimeBetweenLastServerPacket TotalTimeElapsed" + + +MAX = 2 ** 32 - 1 + +FLOW_NAME = "T:%s_C:%s:%s_S:%s:%s" + +class TCPStream: + def __init__(self, pkt ): + self.src = pkt["IP"].src + self.dst = pkt["IP"].dst + self.sport = pkt["TCP"].sport + self.dport = pkt["TCP"].dport + self.time = float(pkt.time) + self.tcp_state = TCPStateMachine(pkt) + self.flows = self.tcp_state.flows + self.pkts = {self.time:pkt} + self.ordered_pkts = None + self.last_time_written = 0.0 + self.pcap_file = None + self.couchdb_file = None + self.flow_file = None + + def len_pkts(self): + return len(list(self.pkts.keys())) + + def destroy(self, pkts_cnt=0): + #self.ordered_pkts = None + keys = list(self.pkts.keys()) + print(("destroying packets for %s"%self.get_stream_name())) + if pkts_cnt > 0: + keys = keys[:pkts_cnt] + print(("\t...Destroying the first %d pkts"%pkts_cnt)) + + if not self.ordered_pkts is None and len(self.ordered_pkts) >= pkts_cnt: + for i in self.ordered_pkts[:pkts_cnt]: + del i + + self.ordered_pkts = None + + for key in keys: + del self.pkts[key] + + def get_stream_name(self): + global FLOW_NAME + return FLOW_NAME%(self.time, self.src, self.sport, self.dst, self.dport) + + def get_stream_keys(self): + global HEADER + return HEADER.split() + + def add_pkt(self, pkt): + is_closed = self.tcp_state.next_state(pkt) + self.pkts[float(pkt.time)] = pkt + self.last_time = pkt.time + return is_closed + + def create_client_directed_flow(self): + return "%s:%s ==> %s:%s"%(self.src,str(self.sport),self.dst,str(self.dport)) + + def create_server_directed_flow(self): + return "%s:%s ==> %s:%s"%(self.dst,str(self.dport),self.src,str(self.sport),) + + def get_client_server_str(self): + return "%s:%s ==> %s:%s"%(self.src,str(self.sport),self.dst,str(self.dport)) + + def get_server_client_str(self): + return "%s:%s <== %s:%s"%(self.dst,str(self.dport),self.src,str(self.sport)) + + def get_client_server(self): + return 0 + + def get_server_client(self): + return 1 + + def interp_direction(self, dir): + if dir == 0: + return get_client_server_str() + elif dir == 1: + return get_server_client_str() + raise Exception("Not a valid direction for the client server") + + def get_order_pkts(self, pkts_cnt=0): + # caching mechanism + if self.ordered_pkts is None or len(self.ordered_pkts) != len(list(self.pkts.keys())): + times = list(self.pkts.keys()) + times.sort() + self.ordered_pkts = [self.pkts[time] for time in times] + if pkts_cnt > 0: + return self.ordered_pkts[:pkts_cnt] + return self.ordered_pkts + + def get_app_stream_summary(self, pkts_cnt=0): + global HEADER + pkts = self.get_order_pkts(pkts_cnt) + pkt_summary = []#[HEADER] + flow_total = 0 + client_total = 0.0 + server_total = 0.0 + + last_client_pkt = None + last_server_pkt = None + last_client_pkt_time = 0.0 + last_server_pkt_time = 0.0 + + i = 0 + while i < len(pkts): + pkt = pkts[i] + flow_info = '' + payload_len = len(pkt['TCP'].payload) + flow_total += payload_len + time_elapsed, time_last_pkt = self.packet_time_spacing_idx(pkts, i) + + if self.src == pkt['IP'].src: + #flow_info = self.get_server_client_str() + flow_info = self.get_client_server() + if not last_client_pkt is None: + last_client_pkt_time = self.packet_time_spacing_pkt(pkt, last_client_pkt) + last_client_pkt = pkt + client_total += len(pkt['TCP'].payload) + else: + #flow_info = self.get_client_server_str() + flow_info = self.get_server_client() + if not last_server_pkt is None: + last_server_pkt_time = self.packet_time_spacing_pkt(pkt,last_server_pkt) + last_server_pkt = pkt + server_total += len(pkt['TCP'].payload) + + pkt_summary.append([ str(pkt.time), + flow_info, + str(payload_len), + str(client_total), + str(server_total), + str(flow_total), + str(time_last_pkt), + str(last_client_pkt_time), + str(last_server_pkt_time), + str(time_elapsed)]) + i+=1 + return pkt_summary + + def get_stream_summary(self, pkts_cnt=0): + pkts = self.get_order_pkts(pkts_cnt) + pkt_summary = []# [HEADER] + flow_total = 0 + client_total = 0.0 + server_total = 0.0 + + last_client_pkt = None + last_server_pkt = None + last_client_pkt_time = 0.0 + last_server_pkt_time = 0.0 + + i = 0 + while i < len(pkts): + pkt = pkts[i] + flow_info = '' + payload_len = len(pkt['TCP']) + flow_total += payload_len + time_elapsed, time_last_pkt = self.packet_time_spacing_idx(pkts, i) + + if self.src == pkt['IP'].src: + flow_info = self.get_server_client_str() + if not last_client_pkt is None: + last_client_pkt_time = self.packet_time_spacing_pkt(pkt, last_client_pkt) + last_client_pkt = pkt + client_total += len(pkt['TCP']) + else: + flow_info = self.get_client_server_str() + if not last_server_pkt is None: + last_server_pkt_time = self.packet_time_spacing_pkt(pkt,last_server_pkt) + last_server_pkt = pkt + server_total += len(pkt['TCP']) + + + pkt_summary.append([ str(pkt.time), + flow_info, + str(payload_len), + str(client_total), + str(server_total), + str(flow_total), + str(time_last_pkt), + str(last_client_pkt_time), + str(last_server_pkt_time), + str(time_elapsed)]) + i+=1 + return pkt_summary + + + def get_ip_summary(self, pkts_cnt=0): + pkts = self.get_order_pkts(pkts_cnt) + pkt_summary = []# [HEADER] + flow_total = 0 + client_total = 0 + server_total = 0 + + last_client_pkt = None + last_server_pkt = None + last_client_pkt_time = 0.0 + last_server_pkt_time = 0.0 + + i = 0 + while i < len(pkts): + pkt = pkts[i] + flow_info = '' + payload_len = len(pkt['IP']) + flow_total += payload_len + time_elapsed, time_last_pkt = self.packet_time_spacing_idx(pkts, i) + + if self.src == pkt['IP'].src: + flow_info = self.get_server_client_str() + if not last_client_pkt is None: + last_client_pkt_time = self.packet_time_spacing_pkt(pkt, last_client_pkt) + last_client_pkt = pkt + client_total += len(pkt['IP']) + else: + flow_info = self.get_client_server_str() + if not last_server_pkt is None: + last_server_pkt_time = self.packet_time_spacing_pkt(pkt,last_server_pkt) + last_server_pkt = pkt + server_total += len(pkt['IP']) + + + pkt_summary.append([ str(pkt.time), + flow_info, + str(payload_len), + str(client_total), + str(server_total), + str(flow_total), + str(time_last_pkt), + str(last_client_pkt_time), + str(last_server_pkt_time), + str(time_elapsed)]) + i+=1 + return pkt_summary + + + def packet_time_spacing_idx(self, pkts, idx): + if idx < 1: return 0.0, 0.0 + return self.packet_time_spacing_pkt(pkts[idx], pkts[0]), self.packet_time_spacing_pkt(pkts[idx], pkts[idx-1]) + + def packet_time_spacing_pkt(self, pkt_a, pkt_b): + return pkt_a.time - pkt_b.time + + + def write_flow(self, filename, pkts_cnt=0): + if self.flow_file is None: + self.flow_file =open(filename+".app_flow", 'w') + + flow_infos = [] + flows = self.get_app_stream_summary(pkts_cnt) + for flow_info in flows: + flow_infos.append(" ".join([str(i) for i in flow_info])) + + self.flow_file.write("\n".join(flow_infos)+"\n") + self.flow_file.flush() + + def write_pcap(self, filename, pkts_cnt=0): + if self.pcap_file is None: + self.pcap_file = scapy.utils.PcapWriter(filename+".pcap") + + pkts = self.get_order_pkts(pkts_cnt) + self.pcap_file.write(pkts) + self.pcap_file.flush() + + + def organize_streams(self): + data = {} + seq_nos = {} + for pkt in self.get_order_pkts(): + tcpp = pkt[TCP] + seq = tcpp.seq + sport = tcpp.sport + if not sport in data: + data[sport] = [] + seq_nos[sport] = [] + + if seq in seq_nos[sport]: + # retransmit + continue + + if Raw in tcpp: + data[sport].append([tcpp.seq, tcpp[Raw].load]) + seq_nos[sport].append(seq) + return seq_nos, data + + def get_stream_data(self): + seq_nos, data = self.organize_streams() + streams = {k:bytes() for k in data.keys()} + + for sport, seqs_payloads in data.items(): + stream = b'' + for seq, payload in seqs_payloads: + stream = stream + payload + streams[sport] = stream + return streams diff --git a/TCPStreamExtractor.py b/TCPStreamExtractor.py new file mode 100644 index 0000000..91b7fa9 --- /dev/null +++ b/TCPStreamExtractor.py @@ -0,0 +1,237 @@ +""" +TCP stream extraction using Scapy. + +(c) Praetorian +Author: Adam Pridgen <adam.pridgen@praetorian.com> || <adam.pridgen@thecoverofnight.com> + + + +This program is free software; you can redistribute it and/or modify it +under the terms of the GNU General Public License as published by the Free +Software Foundation; either version 3, or (at your option) any later +version. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +more details. + +You should have received a copy of the GNU General Public License along +with this program; see the file COPYING. If not, write to the Free +Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +02110-1301, USA. + +Description: tracks extracted streams [TCPStream] and removes the streams if they are timedout + +""" + + +import scapy, threading +import gc +from threading import * +from random import randint +from scapy.all import * + +from TCPStream import * +from TCPState import * + +CLEANUP = True +CLEANUP_THREAD = None + + + +def thread_maintanence(timer_val, stream_extractor, timeout=1000): + new_threads = [] + print ("Maintanence thread was called!") + stream_extractor.cleanup_timedout_streams(timeout) + if not stream_extractor.cleanup: + print ("Maintanence thread was called, but nothing to maintain") + return + #gc.collect() + CLEANUP_THREAD = threading.Timer(timer_val, thread_maintanence, args=(timer_val,stream_extractor )) + CLEANUP_THREAD.start() + + + + +class TCPStreamExtractor: + def __init__(self, filename, packet_list=None, process_packets=True, + outputdir=None, pcap_filters=None): + self.filename = filename + + self.pcap_filter = pcap_filters + self.outputdir=outputdir + + if not self.outputdir is None: + if not os.path.exists(self.outputdir): + os.mkdir(self.outputdir) + if not os.path.exists(os.path.join(self.outputdir, "pcaps")): + os.mkdir(os.path.join(self.outputdir, "pcaps")) + if not os.path.exists(os.path.join(self.outputdir, "flows")): + os.mkdir(os.path.join(self.outputdir, "flows")) + + self.packet_list = packet_list + if packet_list is None: + self.packet_list =scapy.utils.rdpcap(filename) + + self.pkt_num = 0 + # a stream is mapped under two flow keys + self.streams = {} + self.timestamp = 0 + self.DEL_LOCK = threading.Lock() + self.cleanup = True + self.timer = 4.0 + self.data_streams = {} + self.fwd_flows = set() + self.rev_flows = set() + + + if process_packets: + self.process_packets() + + + def __next__(self): + if self.pkt_num >= len(self.packet_list): + return None + pkt = self.packet_list[self.pkt_num] + self.pkt_num += 1 + self.timestamp = int(pkt.time) + if not 'TCP' in pkt: + return pkt + + fwd_flows = set() + rev_flows = set() + + flow = (create_forward_flow(pkt), create_reverse_flow(pkt)) + if not flow[0] in self.streams and\ + not flow[1] in self.streams and is_syn_pkt(pkt): + self.streams [flow[0]] = TCPStream(pkt) + self.streams [flow[1]] = self.streams [flow[0]] + self.fwd_flows.add(flow[0]) + self.rev_flows.add(flow[1]) + elif flow[0] in self.streams: + self.streams[flow[0]].add_pkt(pkt) + return pkt + + def process_packets(self): + while self.pkt_num < len(self.packet_list): + next(self) + + # create data streams + for session in self.fwd_flows: + tcp_stream = self.streams[session] + self.data_streams[session] = tcp_stream.get_stream_data() + return self.pkt_num, self.data_streams + + def summary(self): + flows = sorted(list(self.fwd_flows)) + fmt = "{} (c) {} (s) {}" + c_port = lambda f: int(f.split(':')[1].split()[0]) + s_port = lambda f: int(f.split(':')[-1]) + results = [] + for sess in flows: + http = self.data_streams[sess] + c_str_sz = len(http[c_port(sess)]) + s_str_sz = len(http[s_port(sess)]) + f = fmt.format(sess, c_str_sz, s_str_sz) + results.append(f) + # print ('\n'.join(results)) + return results + + def get_client_server_streams(self, key=None): + keys = sorted(list(self.fwd_flows)) if key is None else [key] + c_port = lambda f: int(f.split(':')[1].split()[0]) + s_port = lambda f: int(f.split(':')[-1]) + results = {} + for sess in keys: + http = self.data_streams[sess] + client = http[c_port(sess)] + server = http[s_port(sess)] + results[sess] = {'client': client, 'server': server} + # print ('\n'.join(results)) + if len(keys) == 1: + return list(results.values())[0] + return results + + def run(self): + global CLEANUP_THREAD + try: + CLEANUP_THREAD = threading.Timer(self.timer, thread_maintanence, args=(self.timer, self )) + CLEANUP_THREAD.start() # Duh! me needs to start or nom nom nom memories! + self.process_packets() + except KeyboardInterrupt: + self.cleanup = False + CLEANUP_THREAD.cancel() + if CLEANUP_THREAD.is_alive(): + CLEANUP_THREAD.join() + + except StopIteration: + self.cleanup = False + CLEANUP_THREAD.cancel() + if CLEANUP_THREAD.is_alive(): + CLEANUP_THREAD.join() + streams = list(self.streams.keys()) + for stream in streams: + self.write_stream(stream) + + + def cleanup_timedout_streams(self, timeout=180.0): + timestamp = self.timestamp + purged_streams = set() + + # dont want streams changing underneath us + keys = list(self.streams.keys()) + for key in keys: + if not key in self.streams: + continue + pkt_cnt = self.streams[key].len_pkts() + l_ts = int(self.streams[key].time) + if (timestamp - l_ts) > timeout: + print(("Timeout occurred: %s - %s => %s, Writing stream: %s"%(str(timestamp),str(l_ts), str(timestamp-l_ts), key))) + purged_streams.add(key) + self.write_stream(key) + self.remove_stream(key) + print(("%s purged from current streams"%key)) + + elif pkt_cnt > 10000: + print(("Writing %d of %d packets from stream: %s"%(pkt_cnt,self.streams[key].len_pkts(), self.streams[key].get_stream_name()))) + self.write_stream(key, pkt_cnt) + self.streams[key].destroy(pkt_cnt) + print(("***Wrote %d packets for stream: %s"%(pkt_cnt,self.streams[key].get_stream_name()))) + print(("Purged %d streams of %d from evaluated streams\n\n"%(len(purged_streams), len(keys)/2))) + + def get_streams(self): + return self.data_streams + + def remove_stream(self, key): + # dont call in cleanup stream, it will deadlock + if not key in self.streams: + return + self.DEL_LOCK.acquire() + flows = self.streams[key].flows + self.streams[list(flows)[0]].destroy() + for i in flows: + #self.streams[i].destroy() + del self.streams[i] + self.DEL_LOCK.release() + + def write_stream(self, key,pkts_cnt=0): + if not key in self.streams: + return + self.DEL_LOCK.acquire() + stream = self.streams[key] + stream_name = stream.get_stream_name() + filename = stream_name + pcap_fname = filename + flow_fname = filename + + if not self.outputdir is None: + odir = os.path.join(self.outputdir, "pcaps") + pcap_fname = os.path.join(odir, stream_name) + odir = os.path.join(self.outputdir, "flows") + flow_fname = os.path.join(odir, stream_name) + + stream.write_pcap(pcap_fname, pkts_cnt) + stream.write_flow(flow_fname, pkts_cnt) + + self.DEL_LOCK.release() diff --git a/example.pcapng b/example.pcapng Binary files differnew file mode 100644 index 0000000..8496643 --- /dev/null +++ b/example.pcapng |