aboutsummaryrefslogtreecommitdiff
path: root/TCPStreamExtractor.py
blob: 7acf6931f229562244e4c28564f3fbc87c29086d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""
TCP stream extraction using Scapy.

(c) Praetorian 
Author: Adam Pridgen <adam.pridgen@praetorian.com> || <adam.pridgen@thecoverofnight.com>



This program is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the Free
Software Foundation; either version 3, or (at your option) any later
version.

This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
more details.

You should have received a copy of the GNU General Public License along
with this program; see the file COPYING.  If not, write to the Free
Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.

Description: tracks extracted streams [TCPStream] and removes the streams if they are timedout
    
"""


import scapy, threading
import gc
from threading import *
from random import randint
from scapy.all import *

from TCPStream import *
from TCPState import *

CLEANUP = True
CLEANUP_THREAD = None



def thread_maintanence(timer_val, stream_extractor, timeout=1000):
    new_threads = []
    print ("Maintanence thread was called!")
    stream_extractor.cleanup_timedout_streams(timeout)
    if not stream_extractor.cleanup:
        print ("Maintanence thread was called, but nothing to maintain")
        return
    #gc.collect()
    CLEANUP_THREAD = threading.Timer(timer_val, thread_maintanence, args=(timer_val,stream_extractor ))
    CLEANUP_THREAD.start()
        
        


class TCPStreamExtractor:
    def __init__(self, filename, packet_list=None, process_packets=True, 
                 outputdir=None, bpf_filter=None):
        self.filename = filename
        
        self.bpf_filter = bpf_filter
        self.outputdir=outputdir
        
        if not self.outputdir is None:
            if not os.path.exists(self.outputdir):
                os.mkdir(self.outputdir)
            if not os.path.exists(os.path.join(self.outputdir, "pcaps")):
                os.mkdir(os.path.join(self.outputdir, "pcaps"))
            if not os.path.exists(os.path.join(self.outputdir, "flows")):
                os.mkdir(os.path.join(self.outputdir, "flows"))
        
        self.packet_list = packet_list
        if packet_list is None:
            self.packet_list = scapy.all.sniff(offline=filename, filter=self.bpf_filter, quiet=True)
        
        self.pkt_num = 0
        # a stream is mapped under two flow keys
        self.streams = {}
        self.timestamp = 0
        self.DEL_LOCK = threading.Lock()
        self.cleanup = True
        self.timer = 4.0
        self.data_streams = {}
        self.fwd_flows = set()
        self.rev_flows = set()

        
        if process_packets:
            self.process_packets()
                
        
    def __next__(self):
        if self.pkt_num >= len(self.packet_list):
            return None  
        pkt = self.packet_list[self.pkt_num]
        self.pkt_num += 1
        self.timestamp = int(pkt.time)
        if not 'TCP' in pkt:
            return pkt
        
        fwd_flows = set()
        rev_flows = set()

        flow = (create_forward_flow(pkt), create_reverse_flow(pkt))
        if not flow[0] in self.streams and\
            not flow[1] in self.streams:
            self.streams [flow[0]] = TCPStream(pkt)
            self.streams [flow[1]] = self.streams [flow[0]]
            self.fwd_flows.add(flow[0])
            self.rev_flows.add(flow[1])
        elif flow[0] in self.streams:
            self.streams[flow[0]].add_pkt(pkt)
        return pkt

    def process_packets(self):
        while self.pkt_num < len(self.packet_list):
            next(self)

        # create data streams
        for session in self.fwd_flows:
            tcp_stream = self.streams[session]
            self.data_streams[session] = tcp_stream.get_stream_data()
        return self.pkt_num, self.data_streams

    def summary(self):
        flows = sorted(list(self.fwd_flows))
        fmt = "{} (c) {}  (s) {}"
        c_port = lambda f: int(f.split(':')[1].split()[0])
        s_port = lambda f: int(f.split(':')[-1])
        results = []
        for sess in flows:                                                                   
            http = self.data_streams[sess]
            c_str_sz = len(http[c_port(sess)])
            s_str_sz = len(http[s_port(sess)])
            f = fmt.format(sess, c_str_sz, s_str_sz)
            results.append(f)
        # print ('\n'.join(results))
        return results

    def get_client_server_streams(self, key=None):
        keys = sorted(list(self.fwd_flows)) if key is None else [key]
        c_port = lambda f: int(f.split(':')[1].split()[0])
        s_port = lambda f: int(f.split(':')[-1])
        results = {}
        for sess in keys:                                                                   
            http = self.data_streams[sess]
            client = http[c_port(sess)]
            server = http[s_port(sess)]
            results[sess] = {'client': client, 'server': server}
        # print ('\n'.join(results))
        if len(keys) == 1:
            return list(results.values())[0]
        return results

    def run(self):
        global CLEANUP_THREAD
        try:
            CLEANUP_THREAD = threading.Timer(self.timer, thread_maintanence, args=(self.timer, self ))
            CLEANUP_THREAD.start() # Duh! me needs to start or nom nom nom memories!
            self.process_packets()
        except KeyboardInterrupt:
            self.cleanup = False
            CLEANUP_THREAD.cancel()
            if CLEANUP_THREAD.is_alive():
                CLEANUP_THREAD.join()

        except StopIteration:
            self.cleanup = False
            CLEANUP_THREAD.cancel()
            if CLEANUP_THREAD.is_alive():
                CLEANUP_THREAD.join()
            streams = list(self.streams.keys())
            for stream in streams:
                self.write_stream(stream)


    def cleanup_timedout_streams(self, timeout=180.0):
        timestamp = self.timestamp
        purged_streams = set()
        
        # dont want streams changing underneath us
        keys = list(self.streams.keys())
        for key in keys:
            if not key in self.streams:
                continue
            pkt_cnt = self.streams[key].len_pkts()
            l_ts = int(self.streams[key].time)
            if (timestamp - l_ts) > timeout:
                print(("Timeout occurred: %s - %s => %s, Writing stream: %s"%(str(timestamp),str(l_ts), str(timestamp-l_ts), key)))
                purged_streams.add(key)
                self.write_stream(key)
                self.remove_stream(key)
                print(("%s purged from current streams"%key))
        
            elif pkt_cnt > 10000:
                print(("Writing %d of %d packets from stream: %s"%(pkt_cnt,self.streams[key].len_pkts(), self.streams[key].get_stream_name()))) 
                self.write_stream(key, pkt_cnt)
                self.streams[key].destroy(pkt_cnt)
                print(("***Wrote %d packets for stream: %s"%(pkt_cnt,self.streams[key].get_stream_name()))) 
        print(("Purged %d streams of %d from evaluated streams\n\n"%(len(purged_streams), len(keys)/2))) 

    def get_streams(self):
        return self.data_streams

    def remove_stream(self, key):
        # dont call in cleanup stream, it will deadlock
        if not key in self.streams:
            return
        self.DEL_LOCK.acquire()
        flows = self.streams[key].flows
        self.streams[list(flows)[0]].destroy()
        for i in flows:
            #self.streams[i].destroy()
            del self.streams[i]
        self.DEL_LOCK.release()
        
    def write_stream(self, key,pkts_cnt=0):
        if not key in self.streams:
            return
        self.DEL_LOCK.acquire()
        stream = self.streams[key]
        stream_name = stream.get_stream_name()
        filename = stream_name
        pcap_fname = filename
        flow_fname = filename
        
        if not self.outputdir is None:
            odir = os.path.join(self.outputdir, "pcaps")
            pcap_fname = os.path.join(odir, stream_name)
            odir = os.path.join(self.outputdir, "flows")
            flow_fname = os.path.join(odir, stream_name)
            
        stream.write_pcap(pcap_fname, pkts_cnt)
        stream.write_flow(flow_fname, pkts_cnt)
                
        self.DEL_LOCK.release()