aboutsummaryrefslogtreecommitdiff
path: root/TCPStreamExtractor.py
diff options
context:
space:
mode:
Diffstat (limited to 'TCPStreamExtractor.py')
-rw-r--r--TCPStreamExtractor.py237
1 files changed, 237 insertions, 0 deletions
diff --git a/TCPStreamExtractor.py b/TCPStreamExtractor.py
new file mode 100644
index 0000000..91b7fa9
--- /dev/null
+++ b/TCPStreamExtractor.py
@@ -0,0 +1,237 @@
+"""
+TCP stream extraction using Scapy.
+
+(c) Praetorian
+Author: Adam Pridgen <adam.pridgen@praetorian.com> || <adam.pridgen@thecoverofnight.com>
+
+
+
+This program is free software; you can redistribute it and/or modify it
+under the terms of the GNU General Public License as published by the Free
+Software Foundation; either version 3, or (at your option) any later
+version.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+more details.
+
+You should have received a copy of the GNU General Public License along
+with this program; see the file COPYING. If not, write to the Free
+Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+02110-1301, USA.
+
+Description: tracks extracted streams [TCPStream] and removes the streams if they are timedout
+
+"""
+
+
+import scapy, threading
+import gc
+from threading import *
+from random import randint
+from scapy.all import *
+
+from TCPStream import *
+from TCPState import *
+
+CLEANUP = True
+CLEANUP_THREAD = None
+
+
+
+def thread_maintanence(timer_val, stream_extractor, timeout=1000):
+ new_threads = []
+ print ("Maintanence thread was called!")
+ stream_extractor.cleanup_timedout_streams(timeout)
+ if not stream_extractor.cleanup:
+ print ("Maintanence thread was called, but nothing to maintain")
+ return
+ #gc.collect()
+ CLEANUP_THREAD = threading.Timer(timer_val, thread_maintanence, args=(timer_val,stream_extractor ))
+ CLEANUP_THREAD.start()
+
+
+
+
+class TCPStreamExtractor:
+ def __init__(self, filename, packet_list=None, process_packets=True,
+ outputdir=None, pcap_filters=None):
+ self.filename = filename
+
+ self.pcap_filter = pcap_filters
+ self.outputdir=outputdir
+
+ if not self.outputdir is None:
+ if not os.path.exists(self.outputdir):
+ os.mkdir(self.outputdir)
+ if not os.path.exists(os.path.join(self.outputdir, "pcaps")):
+ os.mkdir(os.path.join(self.outputdir, "pcaps"))
+ if not os.path.exists(os.path.join(self.outputdir, "flows")):
+ os.mkdir(os.path.join(self.outputdir, "flows"))
+
+ self.packet_list = packet_list
+ if packet_list is None:
+ self.packet_list =scapy.utils.rdpcap(filename)
+
+ self.pkt_num = 0
+ # a stream is mapped under two flow keys
+ self.streams = {}
+ self.timestamp = 0
+ self.DEL_LOCK = threading.Lock()
+ self.cleanup = True
+ self.timer = 4.0
+ self.data_streams = {}
+ self.fwd_flows = set()
+ self.rev_flows = set()
+
+
+ if process_packets:
+ self.process_packets()
+
+
+ def __next__(self):
+ if self.pkt_num >= len(self.packet_list):
+ return None
+ pkt = self.packet_list[self.pkt_num]
+ self.pkt_num += 1
+ self.timestamp = int(pkt.time)
+ if not 'TCP' in pkt:
+ return pkt
+
+ fwd_flows = set()
+ rev_flows = set()
+
+ flow = (create_forward_flow(pkt), create_reverse_flow(pkt))
+ if not flow[0] in self.streams and\
+ not flow[1] in self.streams and is_syn_pkt(pkt):
+ self.streams [flow[0]] = TCPStream(pkt)
+ self.streams [flow[1]] = self.streams [flow[0]]
+ self.fwd_flows.add(flow[0])
+ self.rev_flows.add(flow[1])
+ elif flow[0] in self.streams:
+ self.streams[flow[0]].add_pkt(pkt)
+ return pkt
+
+ def process_packets(self):
+ while self.pkt_num < len(self.packet_list):
+ next(self)
+
+ # create data streams
+ for session in self.fwd_flows:
+ tcp_stream = self.streams[session]
+ self.data_streams[session] = tcp_stream.get_stream_data()
+ return self.pkt_num, self.data_streams
+
+ def summary(self):
+ flows = sorted(list(self.fwd_flows))
+ fmt = "{} (c) {} (s) {}"
+ c_port = lambda f: int(f.split(':')[1].split()[0])
+ s_port = lambda f: int(f.split(':')[-1])
+ results = []
+ for sess in flows:
+ http = self.data_streams[sess]
+ c_str_sz = len(http[c_port(sess)])
+ s_str_sz = len(http[s_port(sess)])
+ f = fmt.format(sess, c_str_sz, s_str_sz)
+ results.append(f)
+ # print ('\n'.join(results))
+ return results
+
+ def get_client_server_streams(self, key=None):
+ keys = sorted(list(self.fwd_flows)) if key is None else [key]
+ c_port = lambda f: int(f.split(':')[1].split()[0])
+ s_port = lambda f: int(f.split(':')[-1])
+ results = {}
+ for sess in keys:
+ http = self.data_streams[sess]
+ client = http[c_port(sess)]
+ server = http[s_port(sess)]
+ results[sess] = {'client': client, 'server': server}
+ # print ('\n'.join(results))
+ if len(keys) == 1:
+ return list(results.values())[0]
+ return results
+
+ def run(self):
+ global CLEANUP_THREAD
+ try:
+ CLEANUP_THREAD = threading.Timer(self.timer, thread_maintanence, args=(self.timer, self ))
+ CLEANUP_THREAD.start() # Duh! me needs to start or nom nom nom memories!
+ self.process_packets()
+ except KeyboardInterrupt:
+ self.cleanup = False
+ CLEANUP_THREAD.cancel()
+ if CLEANUP_THREAD.is_alive():
+ CLEANUP_THREAD.join()
+
+ except StopIteration:
+ self.cleanup = False
+ CLEANUP_THREAD.cancel()
+ if CLEANUP_THREAD.is_alive():
+ CLEANUP_THREAD.join()
+ streams = list(self.streams.keys())
+ for stream in streams:
+ self.write_stream(stream)
+
+
+ def cleanup_timedout_streams(self, timeout=180.0):
+ timestamp = self.timestamp
+ purged_streams = set()
+
+ # dont want streams changing underneath us
+ keys = list(self.streams.keys())
+ for key in keys:
+ if not key in self.streams:
+ continue
+ pkt_cnt = self.streams[key].len_pkts()
+ l_ts = int(self.streams[key].time)
+ if (timestamp - l_ts) > timeout:
+ print(("Timeout occurred: %s - %s => %s, Writing stream: %s"%(str(timestamp),str(l_ts), str(timestamp-l_ts), key)))
+ purged_streams.add(key)
+ self.write_stream(key)
+ self.remove_stream(key)
+ print(("%s purged from current streams"%key))
+
+ elif pkt_cnt > 10000:
+ print(("Writing %d of %d packets from stream: %s"%(pkt_cnt,self.streams[key].len_pkts(), self.streams[key].get_stream_name())))
+ self.write_stream(key, pkt_cnt)
+ self.streams[key].destroy(pkt_cnt)
+ print(("***Wrote %d packets for stream: %s"%(pkt_cnt,self.streams[key].get_stream_name())))
+ print(("Purged %d streams of %d from evaluated streams\n\n"%(len(purged_streams), len(keys)/2)))
+
+ def get_streams(self):
+ return self.data_streams
+
+ def remove_stream(self, key):
+ # dont call in cleanup stream, it will deadlock
+ if not key in self.streams:
+ return
+ self.DEL_LOCK.acquire()
+ flows = self.streams[key].flows
+ self.streams[list(flows)[0]].destroy()
+ for i in flows:
+ #self.streams[i].destroy()
+ del self.streams[i]
+ self.DEL_LOCK.release()
+
+ def write_stream(self, key,pkts_cnt=0):
+ if not key in self.streams:
+ return
+ self.DEL_LOCK.acquire()
+ stream = self.streams[key]
+ stream_name = stream.get_stream_name()
+ filename = stream_name
+ pcap_fname = filename
+ flow_fname = filename
+
+ if not self.outputdir is None:
+ odir = os.path.join(self.outputdir, "pcaps")
+ pcap_fname = os.path.join(odir, stream_name)
+ odir = os.path.join(self.outputdir, "flows")
+ flow_fname = os.path.join(odir, stream_name)
+
+ stream.write_pcap(pcap_fname, pkts_cnt)
+ stream.write_flow(flow_fname, pkts_cnt)
+
+ self.DEL_LOCK.release()