aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlns <matzeton@googlemail.com>2022-06-13 14:36:21 +0200
committerlns <matzeton@googlemail.com>2022-06-13 14:36:21 +0200
commit13de32fa34acf2e494af8c9e15aa0b8bb6be0a4a (patch)
treeaacbdc6e7e0682484cc3dc98975e636edd106164
Initial commit.
Signed-off-by: lns <matzeton@googlemail.com>
-rw-r--r--README.md28
-rwxr-xr-xTCPSplit.py208
-rw-r--r--TCPState.py984
-rw-r--r--TCPStream.py332
-rw-r--r--TCPStreamExtractor.py237
-rw-r--r--example.pcapngbin0 -> 2952 bytes
6 files changed, 1789 insertions, 0 deletions
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..c379a1c
--- /dev/null
+++ b/README.md
@@ -0,0 +1,28 @@
+tcp-split
+=========
+
+This project is mostly used for my personal research and testing purposes.
+
+Split TCP segments of a stream into smaller ones using Scapy and PCAP files.
+Inspired and Copy&Paste from [scapy-tcp-extractor](https://github.com/deeso/scapy-tcp-extractor).
+
+```shell
+usage: TCPSplit.py [-h] [-o OUTPUT] [-s] [-l LENGTH] input
+
+positional arguments:
+ input PCAP input file
+
+options:
+ -h, --help show this help message and exit
+ -o OUTPUT, --output OUTPUT
+ PCAP output file
+ -s, --summary Print found TCP Streams to stdout
+ -l LENGTH, --length LENGTH
+ Split TCP payload every n bytes
+```
+
+You can use the `example.pcapng` which contains two TCP Streams with some ASCII content by typing:
+
+`./TCPSplit.py -o ./splitted.pcap -l1 -s ./example.pcapng`
+
+This will print a summary of all found TCP streams and split the TCP segments into segments of 1 byte size. The resulting `splitted.pcap` file should contain valid TCP streams.
diff --git a/TCPSplit.py b/TCPSplit.py
new file mode 100755
index 0000000..e85a41f
--- /dev/null
+++ b/TCPSplit.py
@@ -0,0 +1,208 @@
+#!/usr/bin/env python3
+
+"""
+Follow/Split TCP streams with scapy.
+"""
+
+import argparse
+import scapy
+import scapy.all
+import sys
+import time
+
+import TCPStream
+import TCPStreamExtractor
+
+
+MAX_BYTES_PER_PACKET = 2
+
+
+class TCPSplitStreamException(Exception):
+
+ def __init__(self, msg):
+ super(Exception, self).__init__(msg)
+
+
+class TCPSplitData(object):
+
+ def __init__(self, pkt_timestamp, pkt_direction, pkt_payload):
+ self.time = pkt_timestamp
+ self.direction = pkt_direction # False if src->dst, True otherwise
+ self.payload = pkt_payload
+
+
+class TCPSplitStream(object):
+
+ def __init__(self, tcp_stream: TCPStream.TCPStream):
+ if not isinstance(tcp_stream, TCPStream.TCPStream):
+ raise TypeError('Got type ' + str(type(tcp_stream)) +
+ ', expected ' + str(TCPStream.TCPStream))
+ self.stream = tcp_stream
+ self.ordered_pkts = self.stream.get_order_pkts()
+
+ def __generate_handshake(self):
+ self.seq = self.ordered_pkts[0][scapy.all.TCP].seq # TCP-SYN
+ self.ack = self.ordered_pkts[1][scapy.all.TCP].seq # TCP-SYN-ACK
+ self.ip2dst = scapy.all.IP(src = self.stream.src, dst = self.stream.dst)
+ self.ip2src = scapy.all.IP(src = self.stream.dst, dst = self.stream.src)
+
+ syn = scapy.all.TCP(sport = self.stream.sport, dport = self.stream.dport,
+ flags = 'S', seq = self.seq, ack = 0)
+ self.seq += 1
+ synack = scapy.all.TCP(sport = self.stream.dport, dport = self.stream.sport,
+ flags = 'SA', seq = self.ack, ack = self.seq)
+ self.ack += 1
+ ack = scapy.all.TCP(sport = self.stream.sport, dport = self.stream.dport,
+ flags = 'A', seq = self.seq, ack = self.ack)
+
+ handshake = list()
+ handshake += self.ip2dst/syn
+ handshake += self.ip2src/synack
+ handshake += self.ip2dst/ack
+ handshake[0].time = self.ordered_pkts[0].time # time(TCP-SYN)
+ handshake[1].time = self.ordered_pkts[1].time # time(TCP-SYN-ACK)
+ handshake[2].time = self.ordered_pkts[2].time # time(TCP-ACK)
+ return handshake
+
+ def __generate_payload(self, split_data, bytes_per_packet):
+ packets = list()
+
+ for i in range(0, len(split_data.payload), bytes_per_packet):
+ payload = split_data.payload[i:i + bytes_per_packet]
+ if split_data.direction is False: # reverse direction e.g. server -> client?
+ iphdr_src = self.ip2src
+ iphdr_dst = self.ip2dst
+ tcphdr_sport = self.stream.sport
+ tcphdr_dport = self.stream.dport
+ tcphdr_seq = self.seq
+ tcphdr_ack = self.ack
+ else:
+ iphdr_src = self.ip2dst
+ iphdr_dst = self.ip2src
+ tcphdr_sport = self.stream.dport
+ tcphdr_dport = self.stream.sport
+ tcphdr_seq = self.ack
+ tcphdr_ack = self.seq
+
+ iphdr = iphdr_dst
+ tcphdr = scapy.all.TCP(sport = tcphdr_sport, dport = tcphdr_dport,
+ flags = 'PA', seq = tcphdr_seq, ack = tcphdr_ack)
+ p = iphdr/tcphdr/scapy.all.Raw(load = payload)
+ p.time = split_data.time
+ packets += p
+ tcphdr_seq += len(payload)
+
+ iphdr = iphdr_src
+ tcphdr = scapy.all.TCP(sport = tcphdr_dport, dport = tcphdr_sport,
+ flags = 'A', seq = tcphdr_ack, ack = tcphdr_seq)
+ p = iphdr/tcphdr
+ p.time = split_data.time
+ packets += p
+
+ if split_data.direction is False:
+ self.seq = tcphdr_seq
+ self.ack = tcphdr_ack
+ else:
+ self.seq = tcphdr_ack
+ self.ack = tcphdr_seq
+
+ return packets
+
+ def __generate_finish(self, split_data):
+ packets = list()
+
+ # We are always closing the connection from the client side for now.
+ iphdr = self.ip2dst
+ tcphdr = scapy.all.TCP(sport = self.stream.sport, dport = self.stream.dport,
+ flags = 'FA', seq = self.seq, ack = self.ack)
+ p = iphdr/tcphdr
+ p.time = split_data.time
+ packets += p
+ self.seq += 1
+
+ iphdr = self.ip2src
+ tcphdr = scapy.all.TCP(sport = self.stream.dport, dport = self.stream.sport,
+ flags = 'FA', seq = self.ack, ack = self.seq)
+ p = iphdr/tcphdr
+ p.time = split_data.time
+ packets += p
+ self.ack += 1
+
+ iphdr = self.ip2dst
+ tcphdr = scapy.all.TCP(sport = self.stream.sport, dport = self.stream.dport,
+ flags = 'A', seq = self.seq, ack = self.ack)
+ p = iphdr/tcphdr
+ p.time = split_data.time
+ packets += p
+
+ return packets
+
+ @staticmethod
+ def build_stream_key(pkt):
+ return str(pkt['IP'].src) + ':' + str(pkt['TCP'].sport) + \
+ ' -> ' + \
+ str(pkt['IP'].dst) + ':' + str(pkt['TCP'].dport)
+
+ def split(self):
+ data = []
+ seq_nos = {}
+ for pkt in self.ordered_pkts:
+ tcpp = pkt[scapy.all.TCP]
+ seq = tcpp.seq
+ key = TCPSplitStream.build_stream_key(pkt)
+ if not key in seq_nos:
+ seq_nos[key] = []
+
+ if seq in seq_nos[key]:
+ # retransmit
+ continue
+
+ if scapy.all.Raw in tcpp:
+ data.append(TCPSplitData(pkt.time, False if pkt.sport == self.stream.sport else True,
+ tcpp[scapy.all.Raw].load))
+ seq_nos[key].append(seq)
+
+ tcp_stream = self.__generate_handshake()
+
+ data.sort(key = lambda tcp_split_data: tcp_split_data.time)
+ for payload_packet in data:
+ tcp_stream += self.__generate_payload(payload_packet, MAX_BYTES_PER_PACKET)
+
+ tcp_stream += self.__generate_finish(data[-1:][0])
+
+ return tcp_stream
+
+
+def printStreams(tse):
+ print('TCP Streams found:')
+ for stream in tse.summary():
+ print('\t' + stream)
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument('input', type=str, help='PCAP input file')
+ parser.add_argument('-o', '--output', type=str, help='PCAP output file',
+ default=None)
+ parser.add_argument('-s', '--summary', action="store_true",
+ help='Print found TCP Streams to stdout',
+ default=False)
+ parser.add_argument('-l', '--length', type=int,
+ help='Split TCP payload every n bytes',
+ default=MAX_BYTES_PER_PACKET)
+ args = parser.parse_args()
+
+ tse = TCPStreamExtractor.TCPStreamExtractor(args.input)
+ if args.summary is True:
+ printStreams(tse)
+
+ MAX_BYTES_PER_PACKET = args.length
+
+ all_streams = list()
+ for session in tse.fwd_flows:
+ stream = tse.streams[session]
+ tss = TCPSplitStream(stream)
+ all_streams += tss.split()
+
+ if args.output is not None:
+ scapy.all.wrpcap(args.output, all_streams)
diff --git a/TCPState.py b/TCPState.py
new file mode 100644
index 0000000..2c936d9
--- /dev/null
+++ b/TCPState.py
@@ -0,0 +1,984 @@
+"""
+TCP stream extraction using Scapy.
+
+(c) Praetorian
+Author: Adam Pridgen <adam.pridgen@praetorian.com> || <adam.pridgen@thecoverofnight.com>
+
+This program is free software; you can redistribute it and/or modify it
+under the terms of the GNU General Public License as published by the Free
+Software Foundation; either version 3, or (at your option) any later
+version.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+more details.
+
+You should have received a copy of the GNU General Public License along
+with this program; see the file COPYING. If not, write to the Free
+Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+02110-1301, USA.
+
+Description: tracks TCPStream state between a client and server
+
+"""
+from scapy.all import *
+from random import randint
+
+
+is_syn_pkt = lambda pkt: 'TCP' in pkt and pkt['TCP'].flags == TCP_FLAGS['S']
+is_synack_pkt = lambda pkt: 'TCP' in pkt and pkt['TCP'].flags == (TCP_FLAGS['S'] | TCP_FLAGS['A'])
+
+create_pkt_flow = lambda pkt: "%s:%s ==> %s:%s"%(pkt['IP'].src,str(pkt['IP'].sport),pkt['IP'].dst,str(pkt['IP'].dport))
+
+create_forward_flow = lambda pkt: "%s:%s ==> %s:%s"%(pkt['IP'].src,str(pkt['IP'].sport),pkt['IP'].dst,str(pkt['IP'].dport))
+
+create_reverse_flow = lambda pkt: "%s:%s ==> %s:%s"%(pkt['IP'].dst,str(pkt['IP'].dport),pkt['IP'].src,str(pkt['IP'].sport))
+
+
+create_flow = create_forward_flow
+
+
+TCP_FLAGS = {"F":0x1, "S":0x2, "R":0x4, "P":0x8,
+ "A":0x10, "U":0x20, "E":0x40, "C":0x80,
+ 0x1:"F", 0x2:"S", 0x4:"R", 0x8:"P",
+ 0x10:"A", 0x20:"U", 0x40:"E", 0x80:"C"}
+
+TCP_STATES = {"LISTEN":{'S':["SYN_RCVD", 'SA']},
+ "SYN_SENT":{'SA':["ESTABLISHED", 'A'],'S':["SYN_RCVD", 'SA'],},
+ "SYN_RCVD":{'F':["FIN_WAIT_1", 'A'],'A':["ESTABLISHED", ''],'R':["LISTEN", ''],},
+ "LAST_ACK":{},
+ "CLOSE_WAIT":{"":["LAST_ACK","F"]}, # initiated by the server
+ "LAST_ACK":{"A":["CLOSED",""]},
+ "ESTABLISHED":{"F":["FIN_WAIT_1",""],},
+ "FIN_WAIT_1":{"A":["FIN_WAIT_2",""],"F":["CLOSED","A"],"FA":["TIME_WAIT","A"],},
+ "FIN_WAIT_2":{"F":["TIME_WAIT","A"],},
+ "CLOSED":{"A":["TIME_WAIT", ""]},}
+
+
+flags_equal = lambda pkt, flag: pkt['TCP'].flags == flag
+flags_set = lambda pkt, flag: (pkt['TCP'].flags & flag) != 0
+
+class TCPStateMachine:
+ def __init__(self, pkt=None):
+ if not pkt is None:
+ self.init(pkt)
+
+ def init(self, pkt):
+ if not 'TCP' in pkt:
+ raise Exception("Not a TCP Packet")
+ if not is_syn_pkt(pkt):
+ raise Exception("Not valid SYN")
+
+ self.flows = set((create_forward_flow(pkt), create_reverse_flow(pkt)))
+ self.server = pkt['IP'].dst
+ self.client = pkt['IP'].src
+
+ # 0 is now, 1 is the future Flags
+ self.server_state = "LISTEN"
+ self.client_state = "SYN_SENT"
+
+ self.server_close_time = -1.0
+ self.client_close_time = -1.0
+ self.fin_wait_time = -1.0
+
+
+
+ def next_state(self, pkt):
+ if not 'TCP' in pkt:
+ raise Exception("Not a TCP Packet")
+
+ # determine in what context we are handling this packet
+ flow = create_flow(pkt)
+ if flow not in self.flows:
+ raise Exception("Not a valid packet for this model")
+
+ if pkt['IP'].dst == self.server:
+ v = self.handle_client_pkt(pkt)
+ if self.is_fin_wait():
+ self.fin_wait_time = pkt.time
+ return v
+ else:
+ v = self.handle_server_pkt(pkt)
+ if self.is_fin_wait():
+ self.fin_wait_time = pkt.time
+ return v
+
+
+ raise Exception("Not a valid packet for this model")
+
+
+ def get_states(self):
+ return (self.client_state, self.server_state)
+
+
+ def build_flags(self, sflags):
+ return sum([TCP_FLAGS[i] for i in sflags])
+
+
+ def active_close(self):
+ return (self.client_state == self.server_state and self.server_state == "CLOSED")
+
+ def passive_close(self):
+ return (self.client_state == "LAST_ACK" and self.server_state == "CLOSE_WAIT")
+
+ def is_established(self):
+ return (self.client_state == self.server_state and self.server_state == "ESTABLISHED")
+
+ def client_prehandshake(self):
+ return (self.client_state == "SYN_SENT") or (self.client_state == "SYN_RCVD")
+
+ def server_prehandshake(self):
+ return (self.server_state == "SYN_SENT") or (self.server_state == "SYN_RCVD") or (self.server_state == "LISTEN")
+
+ def is_fin_wait(self):
+ return self.client_state.find("FIN_WAIT") > -1 or self.server_state.find("FIN_WAIT") > -1
+ def is_prehandshake(self):
+ return self.client_prehandshake() and self.server_prehandshake()
+
+ def is_closed(self):
+ return self.passive_close() or self.active_close()
+
+ def handle_client_pkt(self, pkt):
+ flags = pkt['TCP'].flags
+ client_got_closed = False
+ server_got_closed = False
+
+ if flags == self.build_flags("R"):
+ self.client_state = "CLOSED"
+ self.server_state = "CLOSED"
+ server_got_closed = True
+ client_got_closed = True
+
+ elif flags == self.build_flags("RA"):
+ self.client_state = "CLOSED"
+ self.server_state = "CLOSED"
+ server_got_closed = True
+ client_got_closed = True
+ elif flags == self.build_flags("S"):
+ self.server_state = "SYN_SENT"
+
+ elif self.client_state == "SYN_SENT":
+ if flags & self.build_flags("A") > 0:
+ self.client_state = "ESTABLISHED"
+ self.server_state = "ESTABLISHED"
+ else:
+ self.client_state = "CLOSED"
+ server_got_closed = pkt.time
+ client_got_closed = pkt.time
+ return self.is_closed()
+
+ elif self.client_state == "SYN_SENT":
+ if flags & self.build_flags("SA") > 0:
+ self.client_state = "SYN_RCVD"
+
+ elif self.client_state == "SYN_RECVD" and\
+ flags & self.build_flags("F") > 0:
+ self.client_state = "FIN_WAIT_1"
+
+ elif self.client_state == "ESTABLISHED" and\
+ flags == self.build_flags("FA"):
+ self.client_state = "FIN_WAIT_1"
+
+ elif self.client_state == "FIN_WAIT_1" and\
+ flags == self.build_flags("A"):
+ self.client_state = "CLOSED"
+
+ elif self.client_state == "ESTABLISHED" and\
+ self.server_state == "CLOSE_WAIT" and\
+ flags & self.build_flags("A") > 0:
+ self.client_state = "CLOSED"
+
+ if self.server_state == "FIN_WAIT_1" and\
+ self.client_state == "CLOSED" and\
+ flags == self.build_flags("A"):
+ self.server_state = "CLOSED"
+ server_got_closed = True
+ client_got_closed = True
+
+ if client_got_closed:
+ self.client_close_timed = pkt.time
+ if server_got_closed:
+ self.server_close_timed = pkt.time
+
+ return self.is_closed()
+
+ def handle_server_pkt(self, pkt):
+ flags = pkt['TCP'].flags
+ server_got_closed = False
+ client_got_closed = False
+
+ if flags == self.build_flags("R"):
+ self.client_state = "CLOSED"
+ self.server_state = "CLOSED"
+ server_got_closed = True
+ client_got_closed = True
+
+ elif flags == self.build_flags("RA"):
+ self.client_state = "CLOSED"
+ self.server_state = "CLOSED"
+ server_got_closed = True
+ client_got_closed = True
+
+ elif flags == self.build_flags("S"):
+ self.server_state = "SYN_SENT"
+ elif self.server_state == "LISTEN" and\
+ flags == self.build_flags("SA"):
+ self.server_state = "SYN_RCVD"
+
+ elif self.server_state == "ESTABLISHED" and\
+ flags == self.build_flags("FA"):
+ self.server_state = "FIN_WAIT_1"
+
+ elif self.server_state == "FIN_WAIT_1" and\
+ flags == self.build_flags("A"):
+ self.server_state = "CLOSED"
+ server_got_closed = True
+
+
+ elif self.server_state == "SYN_RCVD" and\
+ flags == self.build_flags("F"):
+ self.server_state = "FIN_WAIT_1"
+
+ elif self.server_state == "FIN_WAIT_1" and\
+ flags == self.build_flags("FA"):
+ self.server_state = "CLOSED"
+
+ elif self.server_state == "SYN_RCVD" and\
+ flags == self.build_flags("A"):
+ self.server_state = "ESTABLISHED"
+
+ elif self.server_state == "ESTABLISHED" and\
+ flags & self.build_flags("F") > 0:
+ self.server_state = "CLOSE_WAIT"
+
+ elif self.client_state == "FIN_WAIT_1" and\
+ flags == self.build_flags("FA"):
+ self.server_state = "CLOSED"
+ server_got_closed = True
+
+ elif self.client_state == "CLOSED" and\
+ flags == self.build_flags("A"):
+ self.server_state = "CLOSED"
+ server_got_closed = True
+
+ if self.client_state == "FIN_WAIT_1" and\
+ self.server_state == "CLOSED" and\
+ flags == self.build_flags("A"):
+ self.client_state = "CLOSED"
+ client_got_closed = True
+
+ if client_got_closed:
+ self.client_close_timed = pkt.time
+ if server_got_closed:
+ self.server_close_timed = pkt.time
+
+ return self.is_closed()
+
+
+
+#@conf.commands.register
+class TCPState:
+ '''
+ Basic implementation of the TCP State Machine (SM) that
+ is meant to work with scapy.
+
+ Reference RFC 793. TCP not fully implemented. This installment gives
+ no attention to timers, congestion windows, etc. This is a basic protocol
+ implementation so that we can talk to our estranged partner host.
+ '''
+
+ def __init__(self):
+ # RCV should actually be initialized when the
+ # 3-way hand shake takes place
+ irs = randint(0, 0x0FFFFFFFF)
+ iss = randint(0, 0x0FFFFFFFF)
+ print(("Initializing the SND with %x and RCV with %x" % (iss,irs)))
+ self.SND = Snd()
+ self.RCV = Rcv()
+ self.seg_record = []# keep record of session
+ self.una_segs = [] # maintain list of un-acked segs
+ self.previous_payload = None # for ans TCP data send
+ self.state = "CLOSED"
+ self.sock = None
+ self.move_state = self.state_closed
+ # TCP segment info
+ self.sport = randint(0, 0x0FFFFFFFF)
+ self.dport = randint(0, 0x0FFFFFFFF)
+ self.dst = 'localhost'
+
+ def get_socket(self, s):
+ if not s is None:
+ self.sock = s
+ s = None
+ elif s is None and self.sock is None:
+ self.sock = self.init_socket()
+
+ def get_pkt(self, pkt):
+ p = pkt
+ if p is None:
+ p = self.get_base_pkt()
+ return p
+
+ def add_ether(self, pkt):
+ if Ether not in pkt:
+ return Ether() / pkt
+ return pkt
+
+ def check_flags(self, seg, flag_str):
+ '''
+ Compare segment flag values to a flag string.
+
+ :param seg: segment to compare flag values
+ :type seg: scapy.TCP
+ :param flag_str: flag string to compare against the
+ :type flag_str: string
+ segment
+ :return: flag values match
+ :rtype: boolean
+ '''
+ return self.get_flag_val(flag_str) == seg.flags
+
+ def init_from_pkt(self, seg):
+ '''
+ Initialize the TCP SM based on a TCP segment.
+
+ :param seg: segment to initialize SM from
+ :type seg: scapy.TCP
+ '''
+ self.RCV.init_from_seg(seg)
+ self.SND.init_from_seg(seg)
+
+ def build_basic_pkt(self, dst, dport, sport=None):
+ if sport is None:
+ sport = randint(0, 65535)
+ self.sport = sport
+ self.dport = dport
+ self.dst = dst
+ return IP(dst=dst) / TCP(dport=dport, sport=sport)
+
+ def get_rbase_tcp(self, rseg):
+ '''
+ Creates a base TCP segment based on a rcvd segment.
+
+ :param rseg: rcvd segment to base a new segment off of
+ :type rseg: scapy.TCP
+ '''
+ sport = rseg.dport
+ dport = rseg.sport
+ options = rseg.options
+ return TCP(sport=sport, dport=dport, options=options)
+
+ def get_rbase_ip(self, rpkt):
+ '''
+ Creates a base IP packet based on a rcvd segment.
+
+ :param rpkt: rcvd IP packet to base a new packet off of
+ :type rpkt: scapy.IP
+ '''
+ dst = rpkt.src
+ src = rpkt.dst
+ options = rpkt.options
+ return IP(src=src, dst=dst, options=options)
+
+ def get_rbase_pkt(self, rpkt):
+ '''
+ Creates a base packet based on a rcvd packet.
+
+ :param rpkt: rcvd segment to base a new packet off of
+ :type rpkt: scapy.IP/scapy.TCP
+ '''
+ return IP(dst=rpkt[IP].src) / TCP(dport=rpkt[TCP].sport, sport=rpkt[TCP].dport)
+
+ def get_base_tcp(self):
+ '''
+ Creates a base TCP segment based on a defined internal TCP parameters
+ segment.
+ '''
+ sport = self.sport
+ dport = self.dport
+ return TCP(sport=sport, dport=dport)
+
+ def get_base_ip(self):
+ '''
+ Creates a base IP packet based on internal TCP/IP stuffs.
+ '''
+ dst = self.dst
+ return IP(dst=dst)
+
+ def get_base_pkt(self):
+ '''
+ Creates a base packet based on a rcvd packet.
+ '''
+ return IP(dst=self.dst) / TCP(dport=self.dport,sport=self.sport)
+
+
+ def update_seg_state(self, seg, payload=None):
+ '''
+ Update the state of a segment based on the TCP state.
+
+ :param seg: segment to update the ack and seq numbers for
+ :type seg: scapy.TCP
+ '''
+ seg = self.RCV.update_seg(seg)[0]
+ seg, pay = self.SND.update_seg(seg, payload)
+ return seg, pay
+
+ def get_flag_val(self, flag_str):
+ '''
+ Get flag values based on flag string.
+
+ :param flag_str: flag string to convert to int
+ :type flag_str: string
+
+ :return: integer representation of the flag string
+ :rtype: integer
+ '''
+ flags = 0
+ for i in flag_str:
+ flags += TCP_FLAGS[i]
+ return flags
+
+ def check_pkt(self, pkt):
+ '''
+ Check to see if the pkt contains a TCP segment.
+
+ :param pkt: packet that may or may not contain a pkt
+ :type pkt: scapy.Packet
+
+ :return: TCP payload is in the packet
+ :rtype: boolean
+ '''
+ return not pkt is None and TCP in pkt
+
+ def update_from_pkt(self, pkt):
+ '''
+ Update TCP state from the given packet.
+
+ :param pkt: packet that is used to update TCP state
+ :type pkt: scapy.Packet
+
+ :return: successful update
+ :rtype: boolean
+ '''
+ if self.check_pkt(pkt):
+ seg = pkt[TCP]
+ x = self.update_snd(seg)
+ y = self.update_rcv(seg)
+ return x and y
+ return False
+
+ def update_snd(self, seg):
+ '''
+ Update the SND (seq numbers and such) portion of the TCP SM.
+
+ :param seg: TCP segment
+ :type seg: scapy.TCP
+
+ :return: successful update
+ :rtype: boolean
+ '''
+ return self.SND.update_from_seg(seg)
+
+ def update_rcv(self, seg):
+ '''
+ Update the RCV (rcv numbers and such) portion of the TCP SM.
+
+ :param seg: TCP segment
+ :type seg: scapy.TCP
+
+ :return: successful update
+ :rtype: boolean
+ '''
+ return self.RCV.update_from_seg(seg)
+
+ # handle send syn stuff
+ def create_seg(self, seg=None, flags="S", payload=None ):
+ '''
+ Create a segment based on the TCP SM, flags, and payload.
+
+ :param seg: TCP segment
+ :type seg: scapy.TCP
+ :param flags: flags string to set in the segment
+ :type flags: string
+ :param payload: payload to include in the segment
+ :type payload: string
+
+ :return: tuple of the TCP segment and unused payload
+
+ :rtype: (scapy.TCP, string)
+ '''
+ s = self.get_pkt(seg)
+ seg = None
+ pay = payload
+ payload = None
+ s, pay = self.update_seg_state(s, pay)
+ s.flags = self.get_flag_val(flags)
+ return s, pay
+
+ def rcv_syn(self, rpkt):
+ '''
+ Update TCP SM based on rcv'd syn packet.
+
+ :param rpkt: IP/TCP pkt
+ :type rpkt: scapy.Packet
+ '''
+ self.dport = seg.sport
+ self.sport = seg.dport
+ self.dst = seg.src
+
+ # init tcp state
+ self.RCV.init_from_seg(rpkt[TCP])
+ self.state = "SYN_RCVD"
+
+ def rcv_syn_ans(self, rpkt, s=None):
+ '''
+ Update TCP SM based on rcv'd a syn packet and
+ respond automatically.
+
+ :param rpkt: IP/TCP pkt
+ :type rpkt: scapy.Packet
+ :param s: socket to send packet out on
+ :type s: scapy.L3Socket
+ '''
+ self.get_socket(s)
+ s = None
+
+ self.rcv_syn(rpkt)
+ # get IP and TCP vals
+ pkt = self.get_base_pkt()
+ self.state = "SYN_RCVD"
+ self.move_state = self.state_synrcvd
+ return self.send_pkt(pkt, self.sock, flags="SA")
+
+ def rcv_synack(self, rpkt):
+ '''
+ Update TCP SM based on rcv'd a syn-ack packet.
+
+ :param rpkt: IP/TCP pkt
+ :type rpkt: scapy.Packet
+ '''
+ if self.check_pkt(rpkt):
+ self.init_from_pkt(rpkt[TCP])
+
+ def rcv_synack_ans(self, rpkt, s=None):
+ '''
+ Update TCP SM based on rcv'd a syn-ack packet and
+ respond automatically.
+
+ :param rpkt: IP/TCP pkt
+ :type rpkt: scapy.Packet
+ :param s: socket to send packet out on
+ :type s: scapy.L3Socket
+ '''
+ self.get_socket(s)
+ s = None
+
+ self.rcv_synack(rpkt)
+ pkt = self.get_base_pkt()
+ print ("Inside syn-ack ans machine")
+ #rpkt.show()
+ #pkt.show()
+ return self.send_pkt(pkt, s, flags="A")
+
+ def send_pkt(self, pkt=None, s=None, flags=None, payload=None):
+ '''
+ Update TCP Segment and Send the full packet.
+
+ :param s: socket to send packet out on
+ :type s: scapy.L3Socket
+ :param pkt: IP/TCP pkt
+ :type pkt: scapy.Packet
+ :param flags: flags to set in the segment
+ :type flags: string
+ :param payload: payload to include in the packet
+ :type payload: string
+
+ :return: packet received from sending the pkt
+
+ :rtype: scapy.Packet
+ '''
+ p = self.get_pkt(pkt)
+ self.get_socket(s)
+ s = pkt = None
+
+ #pkt = self.add_ether(pkt)
+ p[TCP],pay = self.create_seg(p[TCP], flags=flags,payload=payload)
+ rpkt = self.send_rcv_pkts(self.sock, p)
+ if rpkt is None or not TCP in rpkt:
+ return None, pay
+ return rpkt, pay
+
+ def rcv_fin(self, pkt):
+ self.update_from_pkt(pkt)
+
+ def rcv_fin_ans(self, rpkt, s=None):
+ # skip over FIN_WAIT_* phases and
+ # LAST_ACK states
+ if rpkt is None or\
+ not TCP in None:
+ return None
+ self.rcv_fin(rpkt)
+ if rpkt[TCP].flags == self.get_flag_val("F") or\
+ rpkt[TCP].flags == self.get_flag_val("FA") and\
+ self.state == "ESTABLISHED":
+ self.state = "CLOSED"
+ return self.send_pkt( s=s, flags="FA")
+ elif rpkt[TCP].flags == self.get_flag_val("F") or\
+ rpkt[TCP].flags == self.get_flag_val("FA") and\
+ self.state == "FIN_WAIT_1":
+ self.state = "CLOSED"
+ return self.send_pkt( s=s, flags="A")
+ return (None, None)
+
+ def rcv_seg_ans(self, rpkt, s):
+ if rpkt is None or\
+ not TCP in rpkt:
+ return None
+ rflags = rpkt[TCP].flags
+ if rflags == self.get_flag_val("S"):
+ return self.rcv_syn_ans(rpkt, s)
+ elif rflags == self.get_flag_val("A"):
+ # TODO this is only an ACK and
+ # ot could mean a number of things
+ # this can not be answered automatically
+ # yet
+ return self.rcv_ack_ans(rpkt, s)
+ elif rflags == self.get_flag_val("F") or\
+ rflags == self.get_flag_val("FA"):
+ return self.rcv_fin_ans(rpkt, s)
+ elif rflags == self.get_flag_val("SA"):
+ return self.rcv_synack_ans(rpkt, s)
+ elif rflags == self.get_flag_val("PA"):
+ return self.rcv_pshack_ans(rpkt, s)
+
+ def rcv_pshack_ans(self, rpkt, s=None):
+ if rpkt is None or\
+ not TCP in None:
+ return None
+
+ self.update_from_pkt(rpkt)
+
+ def rcv_ack(self, rpkt):
+ '''
+ Update TCP SM based on rcv'd a ack packet.
+
+ :param rpkt: IP/TCP pkt
+ :type rpkt: scapy.Packet
+ '''
+ self.update_from_pkt(rpkt)
+
+ def rcv_ack_ans(self, rpkt, s=None):
+ self.rcv_ack(rpkt)
+ return None
+
+ def send_rcv_pkts(self, s, pkt):
+ '''
+ Send and recv packets.
+
+ :param s: socket to send packet out on
+ :type s: scapy.L3Socket
+ :param pkt: IP/TCP pkt
+ :type pkt: scapy.Packet
+
+ :return: packet recieved from sending the pkt
+
+ :rtype: scapy.Packet
+ '''
+ result = self.quick_send(s, pkt)
+ if len(result[0]) == 0:
+ self.seg_record.append((pkt, None))
+ return None
+ rpkt = result[0][0][1]
+ self.seg_record.append((pkt, rpkt))
+ return rpkt
+
+ # TCP state transitioning takes place here
+ def state_closed(self, rpkt):
+ self.state == "CLOSED"
+ return self.state
+
+ def state_listen(self, rpkt, s=None):
+ if TCP in rpkt and\
+ rpkt[TCP].flags == self.get_flag_val("S"):
+ self.state = "SYN_RCVD"
+ self.move_state = self.state_syn_rcvd
+ self.rcv_syn(rpkt)
+ return self.state
+ return self.state
+
+ def state_syn_rcvd(self, rpkt, s=None):
+ if TCP in rpkt and\
+ rpkt[TCP].flags == self.get_flag_val("A"):
+ self.state = "ESTABLISHED"
+ self.move_state = self.state_established(rpkt, s)
+ self.rcv_ack(rpkt, s)
+ #self.send_synack(pkt, s)
+
+ def state_syn_sent(self, rpkt, s=None):
+ if TCP in rpkt and\
+ rpkt[TCP].flags == self.get_flag_val("A"):
+ self.state = "ESTABLISHED"
+ self.move_state = self.state_established
+ return self.rcv_synack_ans(rpkt, s)
+
+ def state_established(self, rpkt, s=None):
+ if not TCP in rpkt:
+ return None
+
+ if rpkt[TCP].flags == self.get_flag_val("A"):
+ return self.rcv_ack(rpkt)
+ elif rpkt[TCP].flags == self.get_flag_val("PA"):
+ return self.rcv_pshack(seg)
+ elif rpkt[TCP].flags == self.get_flag_val("RA"):
+ pass
+ #return self.rcv_ack(seg)
+ elif rpkt[TCP].flags == self.get_flag_val("F"):
+ # TODO implement rcv_fin
+ self.state = "CLOSE_WAIT"
+ self.move_state = self.state_close_wait
+ # do not care about the return value for the
+ # ack of the fin, since the socket will close
+ # on the remote end
+ rpkt2, pay= self.send_pkt(self.get_base_pkt(), s=s,flags="A")
+ return self.move_state(rpkt)
+ #return self.rcv_fin_ans(seg)
+ elif rpkt[TCP].flags == self.get_flag_val("FA"):
+ return self.rcv_finack(seg)
+
+ def state_close_wait(self, rpkt, s=None):
+ if self.state == "CLOSE_WAIT":
+ self.state = "LAST_ACK"
+ self.move_state = self.state_last_ack
+ rpkt, pay = self.send_pkt(rpkt, s, flags="F")
+ return self.move_state(rpkt, s)
+
+ def state_last_ack(self, rpkt, s=None):
+ if self.state == "LAST_ACK" and\
+ TCP in rpkt and\
+ rpkt[TCP].flags == self.get_flag_val("A"):
+ self.state = "CLOSED"
+ self.move_state = self.state_closed
+ return False
+
+ def state_closing(self, rpkt, s=None):
+ if self.state == "CLOSING" and\
+ TCP in rpkt and\
+ rpkt[TCP].flags == self.get_flag_val("A"):
+ self.state == "TIME_WAIT"
+ self.move_state = self.state_time_wait
+ return True
+ return False
+
+ def state_time_wait(self, rpkt, s=None):
+ if self.state == "TIME_WAIT":
+ # dont care about cheking rpkt from for an ack
+ # from the fin in the closing state
+ self.state = "CLOSED"
+ self.move_state = self.state_closed
+ return True
+ return False
+
+ def state_fin_wait_1(self, rpkt, s=None):
+ if self.state != "FIN_WAIT_1":
+ return False
+ if TCP in rpkt and\
+ rpkt[TCP].flage == self.get_flag_val("F"):
+ self.state = "FIN_WAIT_2"
+ self.move_state = self.state_fin_wait_2
+ return self.move_state(rpkt)
+
+ if TCP in rpkt and\
+ rpkt[TCP].flags == self.get_flag_val("F"):
+ # TODO implement rcv_fin_ans
+ #rpkt = self.rcv_fin(rpkt, s)
+ self.move_state = self.state_closing
+ self.state = "CLOSING"
+ return self.move_state(rpkt, s)
+
+ def state_fin_wait_1(self, rpkt, s=None):
+ if self.state == "FIN_WAIT_1" and\
+ TCP in rpkt and\
+ rpkt[TCP].flage == self.get_flag_val("A"):
+ self.state = "TIME_WAIT"
+ self.move_state = self.state_fin_wait_2
+ return self.move_state(rpkt)
+
+ def rcv_seg(self, rpkt):
+ self.move_state(rpkt)
+
+ def establish_connection(self, pkt, s=None):
+ '''
+ Send and recv packets.
+
+ :param pkt: IP/TCP pkt
+ :type pkt: scapy.Packet
+ :param s: socket to send packet out on
+ :type s: scapy.L3Socket
+
+ :return: successful connection established,
+ packet received from sending the pkt
+ :rtype: boolean, scapy.Packet
+ '''
+ print ("Preparing to establish a TCP Connection..")
+ self.get_socket(s)
+ s = None
+ print ("Prepping and Sending Syn Segment")
+ rpkt, pay = self.send_pkt(pkt, self.sock, flags="S")
+
+ if rpkt is None or\
+ not self.check_flags(rpkt[TCP], "SA"):
+ return False, rpkt
+ self.state = "SYN_SENT"
+ rpkt = self.rcv_synack_ans(rpkt, s)
+ return True, rpkt
+
+ def listen(self, lport, s=None, timeout=None):
+ """
+ Listen for a connection attempt.
+
+ :param lport: port to look for in the syn packet
+ :type lport: port to listen for
+ :param s: scapy socket to listen on, if none one is initialized
+ :type s: scapy.L2Socket
+ :param timeout: stop sniffing after a given time (default: None)
+ :type timeout: length of time to listen for
+
+ :return: successful connection established,
+ packet received from sending the pkt
+
+ :rtype: boolean, scapy.Packet
+ """
+ print ("Preparing to listen for a TCP Connection..")
+ self.get_socket(s)
+ s = None
+
+ print ("Listening for a connection request")
+ rpkt = self.listen_for_syn(lport, timeout=timeout)
+ rpkt = self.rcv_syn_ans(rpkt)
+ if not rpkt is None:
+ return True, rpkt
+ return False, rpkt
+
+ def simple_send_data(self, seg, payload=None):
+ """
+ Send data, payload, to the remote host using the TCP state machine.
+ The data is contained in payload, and any payload that can not be sent
+ is returned back to the user.
+
+ :param seg: seg contains the data payload
+ :type seg: scapy.TCP
+ :param payload: seg data to send
+ :type payload: string
+
+ :return: successfully sent all data, unsent data
+ :rtype: (boolean, string)
+ """
+ p = ""
+ success = False
+ if not payload is None:
+ p = payload
+ payload = None
+ elif payload is None and\
+ not seg.payload is None:
+ p = str(seg.payload)
+ seg.payload = None
+
+ while 1:
+ seg, p = self.SND.update_seg(seg, p)
+ if seg is None:
+ success = False
+ break
+
+ return success, p
+
+ def flush_rcv_socket(self, sock):
+ '''
+ Flush out all the packets from a socket.
+
+ :param sock: socket to read all data out of
+ :type sock: scapy.SuperSocket
+
+ :return: list of all the packets read out of the socket
+ :rtype: list
+ '''
+
+ pkts = []
+ while 1:
+ pkt = sock.recv(MTU)
+ if pkt is None: break
+ pkts.append(pkt)
+ return pkts
+
+
+ def listen_for_syn(self, lport, s=None, timeout=None, sel_timeout=.1):
+ """
+ Listen for a Syn Packet (based on sniff).
+
+ :param lport: port to look for in the syn packet
+ :type lport: port to listen for
+ :param s: scapy socket to listen on, if none one is initialized
+ :type s: scapy.L3Socket
+ :param timeout: stop sniffing after a given time (default: None)
+ :type timeout: int
+ :param sel_timeout: select timeout period
+ :type sel_timeout: int
+ """
+ self.get_socket(s)
+ s = None
+
+ syn_filter = lambda pkt: not pkt is None and\
+ TCP in pkt and\
+ pkt[TCP].flags == self.get_flag_val("S") and\
+ pkt[TCP].dport== lport
+
+ if timeout is not None:
+ stoptime = time.time()+timeout
+ remain = None
+ pkts = []
+ p = self.flush_rcv_socket(self.sock)
+ while 1:
+ try:
+ if timeout is not None:
+ remain = stoptime-time.time()
+ if remain <= 0:
+ break
+ sel = select([self.sock],[],[], .1)
+ if not sel[0] is None:
+ p = self.sock.recv(MTU)
+ if p is None:
+ continue
+ if syn_filter(p):
+ return p
+ except KeyboardInterrupt:
+ break
+ return None
+
+ def quick_send(self, sock, pkt, timeout=4, inter=0, verbose=None,chainCC=0, retry=0, multi=0):
+ '''
+ Quick send is just a wrapper around scapy sndrcv(...)
+ Check the code or docs for keywords and other stuff, but we
+ simply pass in a packet and a socket.
+
+ :param sock: initialized socket for sending packet data
+ :type sock: scapy.L3socket
+ :param pkt: packet to send
+ :type pkt: scapy.Packet
+ '''
+ return sndrcv(sock, pkt, timeout, inter, verbose, chainCC, retry, multi)
+
+ def init_socket(self, iface=None, filter=None, nofilter=0):
+ print ("Initializing Socket")
+ return self.init_L3socket(filter=filter, nofilter=nofilter,iface=iface)
+
+ def init_L3socket(self, iface=None, filter=None, nofilter=0):
+ print ("Initializing Socket")
+ self.sock = conf.L3socket(filter=filter, nofilter=nofilter,iface=iface)
+ print(("The following socket was initialized", str(socket)))
+ return self.sock
+
+ def init_L2socket(self, iface=None, filter=None, nofilter=0):
+ print ("Initializing Socket")
+ self.sock = conf.L2socket(filter=filter, nofilter=nofilter,iface=iface)
+ print(("The following socket was initialized", str(socket)))
+ return self.sock
diff --git a/TCPStream.py b/TCPStream.py
new file mode 100644
index 0000000..e460b6c
--- /dev/null
+++ b/TCPStream.py
@@ -0,0 +1,332 @@
+"""
+TCP stream extraction using Scapy.
+
+(c) Praetorian
+Author: Adam Pridgen <adam.pridgen@praetorian.com> || <adam.pridgen@thecoverofnight.com>
+
+
+This program is free software; you can redistribute it and/or modify it
+under the terms of the GNU General Public License as published by the Free
+Software Foundation; either version 3, or (at your option) any later
+version.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+more details.
+
+You should have received a copy of the GNU General Public License along
+with this program; see the file COPYING. If not, write to the Free
+Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+02110-1301, USA.
+
+description: keeps track of the packets belonging to a specific tcp stream
+
+"""
+
+from scapy.all import *
+from random import randint
+import scapy
+
+from random import randint
+from TCPState import TCPStateMachine
+
+
+from threading import *
+
+HEADER = "Time Flow_with_Direction PktPayload ClientDataTransfered ServerDataTransfered TotalData TimeBetweenLastPkt TimeBetweenLastClientPacket TimeBetweenLastServerPacket TotalTimeElapsed"
+
+
+MAX = 2 ** 32 - 1
+
+FLOW_NAME = "T:%s_C:%s:%s_S:%s:%s"
+
+class TCPStream:
+ def __init__(self, pkt ):
+ self.src = pkt["IP"].src
+ self.dst = pkt["IP"].dst
+ self.sport = pkt["TCP"].sport
+ self.dport = pkt["TCP"].dport
+ self.time = float(pkt.time)
+ self.tcp_state = TCPStateMachine(pkt)
+ self.flows = self.tcp_state.flows
+ self.pkts = {self.time:pkt}
+ self.ordered_pkts = None
+ self.last_time_written = 0.0
+ self.pcap_file = None
+ self.couchdb_file = None
+ self.flow_file = None
+
+ def len_pkts(self):
+ return len(list(self.pkts.keys()))
+
+ def destroy(self, pkts_cnt=0):
+ #self.ordered_pkts = None
+ keys = list(self.pkts.keys())
+ print(("destroying packets for %s"%self.get_stream_name()))
+ if pkts_cnt > 0:
+ keys = keys[:pkts_cnt]
+ print(("\t...Destroying the first %d pkts"%pkts_cnt))
+
+ if not self.ordered_pkts is None and len(self.ordered_pkts) >= pkts_cnt:
+ for i in self.ordered_pkts[:pkts_cnt]:
+ del i
+
+ self.ordered_pkts = None
+
+ for key in keys:
+ del self.pkts[key]
+
+ def get_stream_name(self):
+ global FLOW_NAME
+ return FLOW_NAME%(self.time, self.src, self.sport, self.dst, self.dport)
+
+ def get_stream_keys(self):
+ global HEADER
+ return HEADER.split()
+
+ def add_pkt(self, pkt):
+ is_closed = self.tcp_state.next_state(pkt)
+ self.pkts[float(pkt.time)] = pkt
+ self.last_time = pkt.time
+ return is_closed
+
+ def create_client_directed_flow(self):
+ return "%s:%s ==> %s:%s"%(self.src,str(self.sport),self.dst,str(self.dport))
+
+ def create_server_directed_flow(self):
+ return "%s:%s ==> %s:%s"%(self.dst,str(self.dport),self.src,str(self.sport),)
+
+ def get_client_server_str(self):
+ return "%s:%s ==> %s:%s"%(self.src,str(self.sport),self.dst,str(self.dport))
+
+ def get_server_client_str(self):
+ return "%s:%s <== %s:%s"%(self.dst,str(self.dport),self.src,str(self.sport))
+
+ def get_client_server(self):
+ return 0
+
+ def get_server_client(self):
+ return 1
+
+ def interp_direction(self, dir):
+ if dir == 0:
+ return get_client_server_str()
+ elif dir == 1:
+ return get_server_client_str()
+ raise Exception("Not a valid direction for the client server")
+
+ def get_order_pkts(self, pkts_cnt=0):
+ # caching mechanism
+ if self.ordered_pkts is None or len(self.ordered_pkts) != len(list(self.pkts.keys())):
+ times = list(self.pkts.keys())
+ times.sort()
+ self.ordered_pkts = [self.pkts[time] for time in times]
+ if pkts_cnt > 0:
+ return self.ordered_pkts[:pkts_cnt]
+ return self.ordered_pkts
+
+ def get_app_stream_summary(self, pkts_cnt=0):
+ global HEADER
+ pkts = self.get_order_pkts(pkts_cnt)
+ pkt_summary = []#[HEADER]
+ flow_total = 0
+ client_total = 0.0
+ server_total = 0.0
+
+ last_client_pkt = None
+ last_server_pkt = None
+ last_client_pkt_time = 0.0
+ last_server_pkt_time = 0.0
+
+ i = 0
+ while i < len(pkts):
+ pkt = pkts[i]
+ flow_info = ''
+ payload_len = len(pkt['TCP'].payload)
+ flow_total += payload_len
+ time_elapsed, time_last_pkt = self.packet_time_spacing_idx(pkts, i)
+
+ if self.src == pkt['IP'].src:
+ #flow_info = self.get_server_client_str()
+ flow_info = self.get_client_server()
+ if not last_client_pkt is None:
+ last_client_pkt_time = self.packet_time_spacing_pkt(pkt, last_client_pkt)
+ last_client_pkt = pkt
+ client_total += len(pkt['TCP'].payload)
+ else:
+ #flow_info = self.get_client_server_str()
+ flow_info = self.get_server_client()
+ if not last_server_pkt is None:
+ last_server_pkt_time = self.packet_time_spacing_pkt(pkt,last_server_pkt)
+ last_server_pkt = pkt
+ server_total += len(pkt['TCP'].payload)
+
+ pkt_summary.append([ str(pkt.time),
+ flow_info,
+ str(payload_len),
+ str(client_total),
+ str(server_total),
+ str(flow_total),
+ str(time_last_pkt),
+ str(last_client_pkt_time),
+ str(last_server_pkt_time),
+ str(time_elapsed)])
+ i+=1
+ return pkt_summary
+
+ def get_stream_summary(self, pkts_cnt=0):
+ pkts = self.get_order_pkts(pkts_cnt)
+ pkt_summary = []# [HEADER]
+ flow_total = 0
+ client_total = 0.0
+ server_total = 0.0
+
+ last_client_pkt = None
+ last_server_pkt = None
+ last_client_pkt_time = 0.0
+ last_server_pkt_time = 0.0
+
+ i = 0
+ while i < len(pkts):
+ pkt = pkts[i]
+ flow_info = ''
+ payload_len = len(pkt['TCP'])
+ flow_total += payload_len
+ time_elapsed, time_last_pkt = self.packet_time_spacing_idx(pkts, i)
+
+ if self.src == pkt['IP'].src:
+ flow_info = self.get_server_client_str()
+ if not last_client_pkt is None:
+ last_client_pkt_time = self.packet_time_spacing_pkt(pkt, last_client_pkt)
+ last_client_pkt = pkt
+ client_total += len(pkt['TCP'])
+ else:
+ flow_info = self.get_client_server_str()
+ if not last_server_pkt is None:
+ last_server_pkt_time = self.packet_time_spacing_pkt(pkt,last_server_pkt)
+ last_server_pkt = pkt
+ server_total += len(pkt['TCP'])
+
+
+ pkt_summary.append([ str(pkt.time),
+ flow_info,
+ str(payload_len),
+ str(client_total),
+ str(server_total),
+ str(flow_total),
+ str(time_last_pkt),
+ str(last_client_pkt_time),
+ str(last_server_pkt_time),
+ str(time_elapsed)])
+ i+=1
+ return pkt_summary
+
+
+ def get_ip_summary(self, pkts_cnt=0):
+ pkts = self.get_order_pkts(pkts_cnt)
+ pkt_summary = []# [HEADER]
+ flow_total = 0
+ client_total = 0
+ server_total = 0
+
+ last_client_pkt = None
+ last_server_pkt = None
+ last_client_pkt_time = 0.0
+ last_server_pkt_time = 0.0
+
+ i = 0
+ while i < len(pkts):
+ pkt = pkts[i]
+ flow_info = ''
+ payload_len = len(pkt['IP'])
+ flow_total += payload_len
+ time_elapsed, time_last_pkt = self.packet_time_spacing_idx(pkts, i)
+
+ if self.src == pkt['IP'].src:
+ flow_info = self.get_server_client_str()
+ if not last_client_pkt is None:
+ last_client_pkt_time = self.packet_time_spacing_pkt(pkt, last_client_pkt)
+ last_client_pkt = pkt
+ client_total += len(pkt['IP'])
+ else:
+ flow_info = self.get_client_server_str()
+ if not last_server_pkt is None:
+ last_server_pkt_time = self.packet_time_spacing_pkt(pkt,last_server_pkt)
+ last_server_pkt = pkt
+ server_total += len(pkt['IP'])
+
+
+ pkt_summary.append([ str(pkt.time),
+ flow_info,
+ str(payload_len),
+ str(client_total),
+ str(server_total),
+ str(flow_total),
+ str(time_last_pkt),
+ str(last_client_pkt_time),
+ str(last_server_pkt_time),
+ str(time_elapsed)])
+ i+=1
+ return pkt_summary
+
+
+ def packet_time_spacing_idx(self, pkts, idx):
+ if idx < 1: return 0.0, 0.0
+ return self.packet_time_spacing_pkt(pkts[idx], pkts[0]), self.packet_time_spacing_pkt(pkts[idx], pkts[idx-1])
+
+ def packet_time_spacing_pkt(self, pkt_a, pkt_b):
+ return pkt_a.time - pkt_b.time
+
+
+ def write_flow(self, filename, pkts_cnt=0):
+ if self.flow_file is None:
+ self.flow_file =open(filename+".app_flow", 'w')
+
+ flow_infos = []
+ flows = self.get_app_stream_summary(pkts_cnt)
+ for flow_info in flows:
+ flow_infos.append(" ".join([str(i) for i in flow_info]))
+
+ self.flow_file.write("\n".join(flow_infos)+"\n")
+ self.flow_file.flush()
+
+ def write_pcap(self, filename, pkts_cnt=0):
+ if self.pcap_file is None:
+ self.pcap_file = scapy.utils.PcapWriter(filename+".pcap")
+
+ pkts = self.get_order_pkts(pkts_cnt)
+ self.pcap_file.write(pkts)
+ self.pcap_file.flush()
+
+
+ def organize_streams(self):
+ data = {}
+ seq_nos = {}
+ for pkt in self.get_order_pkts():
+ tcpp = pkt[TCP]
+ seq = tcpp.seq
+ sport = tcpp.sport
+ if not sport in data:
+ data[sport] = []
+ seq_nos[sport] = []
+
+ if seq in seq_nos[sport]:
+ # retransmit
+ continue
+
+ if Raw in tcpp:
+ data[sport].append([tcpp.seq, tcpp[Raw].load])
+ seq_nos[sport].append(seq)
+ return seq_nos, data
+
+ def get_stream_data(self):
+ seq_nos, data = self.organize_streams()
+ streams = {k:bytes() for k in data.keys()}
+
+ for sport, seqs_payloads in data.items():
+ stream = b''
+ for seq, payload in seqs_payloads:
+ stream = stream + payload
+ streams[sport] = stream
+ return streams
diff --git a/TCPStreamExtractor.py b/TCPStreamExtractor.py
new file mode 100644
index 0000000..91b7fa9
--- /dev/null
+++ b/TCPStreamExtractor.py
@@ -0,0 +1,237 @@
+"""
+TCP stream extraction using Scapy.
+
+(c) Praetorian
+Author: Adam Pridgen <adam.pridgen@praetorian.com> || <adam.pridgen@thecoverofnight.com>
+
+
+
+This program is free software; you can redistribute it and/or modify it
+under the terms of the GNU General Public License as published by the Free
+Software Foundation; either version 3, or (at your option) any later
+version.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+more details.
+
+You should have received a copy of the GNU General Public License along
+with this program; see the file COPYING. If not, write to the Free
+Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+02110-1301, USA.
+
+Description: tracks extracted streams [TCPStream] and removes the streams if they are timedout
+
+"""
+
+
+import scapy, threading
+import gc
+from threading import *
+from random import randint
+from scapy.all import *
+
+from TCPStream import *
+from TCPState import *
+
+CLEANUP = True
+CLEANUP_THREAD = None
+
+
+
+def thread_maintanence(timer_val, stream_extractor, timeout=1000):
+ new_threads = []
+ print ("Maintanence thread was called!")
+ stream_extractor.cleanup_timedout_streams(timeout)
+ if not stream_extractor.cleanup:
+ print ("Maintanence thread was called, but nothing to maintain")
+ return
+ #gc.collect()
+ CLEANUP_THREAD = threading.Timer(timer_val, thread_maintanence, args=(timer_val,stream_extractor ))
+ CLEANUP_THREAD.start()
+
+
+
+
+class TCPStreamExtractor:
+ def __init__(self, filename, packet_list=None, process_packets=True,
+ outputdir=None, pcap_filters=None):
+ self.filename = filename
+
+ self.pcap_filter = pcap_filters
+ self.outputdir=outputdir
+
+ if not self.outputdir is None:
+ if not os.path.exists(self.outputdir):
+ os.mkdir(self.outputdir)
+ if not os.path.exists(os.path.join(self.outputdir, "pcaps")):
+ os.mkdir(os.path.join(self.outputdir, "pcaps"))
+ if not os.path.exists(os.path.join(self.outputdir, "flows")):
+ os.mkdir(os.path.join(self.outputdir, "flows"))
+
+ self.packet_list = packet_list
+ if packet_list is None:
+ self.packet_list =scapy.utils.rdpcap(filename)
+
+ self.pkt_num = 0
+ # a stream is mapped under two flow keys
+ self.streams = {}
+ self.timestamp = 0
+ self.DEL_LOCK = threading.Lock()
+ self.cleanup = True
+ self.timer = 4.0
+ self.data_streams = {}
+ self.fwd_flows = set()
+ self.rev_flows = set()
+
+
+ if process_packets:
+ self.process_packets()
+
+
+ def __next__(self):
+ if self.pkt_num >= len(self.packet_list):
+ return None
+ pkt = self.packet_list[self.pkt_num]
+ self.pkt_num += 1
+ self.timestamp = int(pkt.time)
+ if not 'TCP' in pkt:
+ return pkt
+
+ fwd_flows = set()
+ rev_flows = set()
+
+ flow = (create_forward_flow(pkt), create_reverse_flow(pkt))
+ if not flow[0] in self.streams and\
+ not flow[1] in self.streams and is_syn_pkt(pkt):
+ self.streams [flow[0]] = TCPStream(pkt)
+ self.streams [flow[1]] = self.streams [flow[0]]
+ self.fwd_flows.add(flow[0])
+ self.rev_flows.add(flow[1])
+ elif flow[0] in self.streams:
+ self.streams[flow[0]].add_pkt(pkt)
+ return pkt
+
+ def process_packets(self):
+ while self.pkt_num < len(self.packet_list):
+ next(self)
+
+ # create data streams
+ for session in self.fwd_flows:
+ tcp_stream = self.streams[session]
+ self.data_streams[session] = tcp_stream.get_stream_data()
+ return self.pkt_num, self.data_streams
+
+ def summary(self):
+ flows = sorted(list(self.fwd_flows))
+ fmt = "{} (c) {} (s) {}"
+ c_port = lambda f: int(f.split(':')[1].split()[0])
+ s_port = lambda f: int(f.split(':')[-1])
+ results = []
+ for sess in flows:
+ http = self.data_streams[sess]
+ c_str_sz = len(http[c_port(sess)])
+ s_str_sz = len(http[s_port(sess)])
+ f = fmt.format(sess, c_str_sz, s_str_sz)
+ results.append(f)
+ # print ('\n'.join(results))
+ return results
+
+ def get_client_server_streams(self, key=None):
+ keys = sorted(list(self.fwd_flows)) if key is None else [key]
+ c_port = lambda f: int(f.split(':')[1].split()[0])
+ s_port = lambda f: int(f.split(':')[-1])
+ results = {}
+ for sess in keys:
+ http = self.data_streams[sess]
+ client = http[c_port(sess)]
+ server = http[s_port(sess)]
+ results[sess] = {'client': client, 'server': server}
+ # print ('\n'.join(results))
+ if len(keys) == 1:
+ return list(results.values())[0]
+ return results
+
+ def run(self):
+ global CLEANUP_THREAD
+ try:
+ CLEANUP_THREAD = threading.Timer(self.timer, thread_maintanence, args=(self.timer, self ))
+ CLEANUP_THREAD.start() # Duh! me needs to start or nom nom nom memories!
+ self.process_packets()
+ except KeyboardInterrupt:
+ self.cleanup = False
+ CLEANUP_THREAD.cancel()
+ if CLEANUP_THREAD.is_alive():
+ CLEANUP_THREAD.join()
+
+ except StopIteration:
+ self.cleanup = False
+ CLEANUP_THREAD.cancel()
+ if CLEANUP_THREAD.is_alive():
+ CLEANUP_THREAD.join()
+ streams = list(self.streams.keys())
+ for stream in streams:
+ self.write_stream(stream)
+
+
+ def cleanup_timedout_streams(self, timeout=180.0):
+ timestamp = self.timestamp
+ purged_streams = set()
+
+ # dont want streams changing underneath us
+ keys = list(self.streams.keys())
+ for key in keys:
+ if not key in self.streams:
+ continue
+ pkt_cnt = self.streams[key].len_pkts()
+ l_ts = int(self.streams[key].time)
+ if (timestamp - l_ts) > timeout:
+ print(("Timeout occurred: %s - %s => %s, Writing stream: %s"%(str(timestamp),str(l_ts), str(timestamp-l_ts), key)))
+ purged_streams.add(key)
+ self.write_stream(key)
+ self.remove_stream(key)
+ print(("%s purged from current streams"%key))
+
+ elif pkt_cnt > 10000:
+ print(("Writing %d of %d packets from stream: %s"%(pkt_cnt,self.streams[key].len_pkts(), self.streams[key].get_stream_name())))
+ self.write_stream(key, pkt_cnt)
+ self.streams[key].destroy(pkt_cnt)
+ print(("***Wrote %d packets for stream: %s"%(pkt_cnt,self.streams[key].get_stream_name())))
+ print(("Purged %d streams of %d from evaluated streams\n\n"%(len(purged_streams), len(keys)/2)))
+
+ def get_streams(self):
+ return self.data_streams
+
+ def remove_stream(self, key):
+ # dont call in cleanup stream, it will deadlock
+ if not key in self.streams:
+ return
+ self.DEL_LOCK.acquire()
+ flows = self.streams[key].flows
+ self.streams[list(flows)[0]].destroy()
+ for i in flows:
+ #self.streams[i].destroy()
+ del self.streams[i]
+ self.DEL_LOCK.release()
+
+ def write_stream(self, key,pkts_cnt=0):
+ if not key in self.streams:
+ return
+ self.DEL_LOCK.acquire()
+ stream = self.streams[key]
+ stream_name = stream.get_stream_name()
+ filename = stream_name
+ pcap_fname = filename
+ flow_fname = filename
+
+ if not self.outputdir is None:
+ odir = os.path.join(self.outputdir, "pcaps")
+ pcap_fname = os.path.join(odir, stream_name)
+ odir = os.path.join(self.outputdir, "flows")
+ flow_fname = os.path.join(odir, stream_name)
+
+ stream.write_pcap(pcap_fname, pkts_cnt)
+ stream.write_flow(flow_fname, pkts_cnt)
+
+ self.DEL_LOCK.release()
diff --git a/example.pcapng b/example.pcapng
new file mode 100644
index 0000000..8496643
--- /dev/null
+++ b/example.pcapng
Binary files differ