aboutsummaryrefslogtreecommitdiff
path: root/examples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py
blob: 24e90bf3d5f957f73d7a0bab77a947a7de57118e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#!/usr/bin/env python3

import base64
import json
import re
import sys
import socket
import scapy.all

HOST = '127.0.0.1'
PORT = 7000
NETWORK_BUFFER_MIN_SIZE = 5
NETWORK_BUFFER_MAX_SIZE = 8192

FLOWS = dict()

class nDPIsrvdSocket:
    def __init__(self, sock=None):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    def connect(self, host, port):
        self.sock.connect((host, port))
        self.buffer = bytes()
        self.msglen = 0
        self.digitlen = 0

    def receive(self):
        recvd = self.sock.recv(NETWORK_BUFFER_MAX_SIZE - len(self.buffer))

        if recvd == '':
            raise RuntimeError('socket connection broken')
        self.buffer += recvd

        retval = []
        while self.msglen + self.digitlen < len(self.buffer):

            if self.msglen == 0:
                starts_with_digits = re.match(r'(^\d+){', self.buffer[:NETWORK_BUFFER_MIN_SIZE].decode(errors='strict'))
                if starts_with_digits is None:
                    if len(self.buffer) < NETWORK_BUFFER_MIN_SIZE:
                        break
                    raise RuntimeError('Invalid packet received: {}'.format(self.buffer))
                self.msglen = int(starts_with_digits[1])
                self.digitlen = len(starts_with_digits[1])

            if len(self.buffer) >= self.msglen + self.digitlen:
                recvd = self.buffer[self.digitlen:self.msglen + self.digitlen]
                self.buffer = self.buffer[self.msglen + self.digitlen:]
                retval += [(recvd,self.msglen,self.digitlen)]

                self.msglen = 0
                self.digitlen = 0

        return retval

class Flow:
    def __init__(self, flow_id=-1):
        self.pktdump = None
        self.was_detected = False
        self.flow_id = flow_id
        self.packets = []

    def addPacket(self, pkt):
        self.packets += [pkt]

    def detected(self):
        self.was_detected = True

    def fin(self):
        if self.was_detected is True:
            return

        if self.pktdump is None:
            if self.flow_id == -1:
                self.pktdump = scapy.all.PcapWriter('packet-undetected.pcap', append=True, sync=True)
            else:
                self.pktdump = scapy.all.PcapWriter('flow-undetected-{}.pcap'.format(self.flow_id), append=False, sync=True)

        for packet in self.packets:
            self.pktdump.write(scapy.all.Raw(packet))

        self.pktdump.close()

def parse_json_str(json_str):

    try:
        j = json.loads(json_str[0])
    except json.decoder.JSONDecodeError as exc:
        raise RuntimeError('JSON Exception: {}\n\nJSON String: {}\n'.format(str(exc), str(json_str)))

    global FLOWS

    if 'flow_event_name' in j:

        event = j['flow_event_name'].lower()
        flow_id = j['flow_id']

        if event == 'new':
            print('New flow with id {}.'.format(flow_id))
            FLOWS[flow_id] = Flow(flow_id)
        elif flow_id not in FLOWS:
            print('Ignore flow event with id {} as we did not get any flow-new event.'.format(flow_id))
            return
        elif event == 'end' or event == 'idle':
            if event == 'end':
                print('End flow with id {}.'.format(flow_id))
            elif event == 'idle':
                print('Idle flow with id {}.'.format(flow_id))
            FLOWS[flow_id].fin()
            del FLOWS[flow_id]
        elif event == 'detected':
            FLOWS[flow_id].detected()
        elif event == 'guessed' or event == 'not-detected':
            if event == 'guessed':
                print('Guessed flow with id {}.'.format(flow_id))
            else:
                print('Not-detected flow with id {}.'.format(flow_id))
        else:
            raise RuntimeError('unknown flow event name: {}'.format(event))

    elif 'packet_event_name' in j:

        buffer_decoded = base64.b64decode(j['pkt'], validate=True)

        if j['packet_event_name'] == 'packet-flow':

            flow_id = j['flow_id']
            if flow_id not in FLOWS:
                print('Ignore packet-flow event with id {} as we did not get any flow-new event.'.format(flow_id))
                return

            FLOWS[flow_id].addPacket(buffer_decoded)

        if j['packet_event_name'] == 'packet':

            flow = Flow()
            flow.addPacket(buffer_decoded)


if __name__ == '__main__':
    host = HOST
    port = PORT

    if len(sys.argv) == 1:
        sys.stderr.write('usage: {} [host] [port]\n'.format(sys.argv[0]))
    if len(sys.argv) > 1:
        host = sys.argv[1]
    if len(sys.argv) > 2:
        port = int(sys.argv[2])

    sys.stderr.write('Recv buffer size: {}\n'.format(NETWORK_BUFFER_MAX_SIZE))
    sys.stderr.write('Connecting to {}:{} ..\n'.format(host, port))

    nsock = nDPIsrvdSocket()
    nsock.connect(host, port)

    while True:
        received = nsock.receive()
        for received_json_pkt in received:
            parse_json_str(received_json_pkt)