1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
|
#!/usr/bin/env python3
import base64
import json
import re
import sys
import socket
import scapy.all
HOST = '127.0.0.1'
PORT = 7000
NETWORK_BUFFER_MIN_SIZE = 5
NETWORK_BUFFER_MAX_SIZE = 8192
FLOWS = dict()
class nDPIsrvdSocket:
def __init__(self, sock=None):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def connect(self, host, port):
self.sock.connect((host, port))
self.buffer = bytes()
self.msglen = 0
self.digitlen = 0
def receive(self):
recvd = self.sock.recv(NETWORK_BUFFER_MAX_SIZE - len(self.buffer))
if recvd == '':
raise RuntimeError('socket connection broken')
self.buffer += recvd
retval = []
while self.msglen + self.digitlen < len(self.buffer):
if self.msglen == 0:
starts_with_digits = re.match(r'(^\d+){', self.buffer[:NETWORK_BUFFER_MIN_SIZE].decode(errors='strict'))
if starts_with_digits is None:
if len(self.buffer) < NETWORK_BUFFER_MIN_SIZE:
break
raise RuntimeError('Invalid packet received: {}'.format(self.buffer))
self.msglen = int(starts_with_digits[1])
self.digitlen = len(starts_with_digits[1])
if len(self.buffer) >= self.msglen + self.digitlen:
recvd = self.buffer[self.digitlen:self.msglen + self.digitlen]
self.buffer = self.buffer[self.msglen + self.digitlen:]
retval += [(recvd,self.msglen,self.digitlen)]
self.msglen = 0
self.digitlen = 0
return retval
class Flow:
def __init__(self, flow_id=-1):
self.pktdump = None
self.was_detected = False
self.flow_id = flow_id
self.packets = []
def addPacket(self, pkt):
self.packets += [pkt]
def detected(self):
self.was_detected = True
def fin(self):
if self.was_detected is True:
return
if self.pktdump is None:
if self.flow_id == -1:
self.pktdump = scapy.all.PcapWriter('packet-undetected.pcap', append=True, sync=True)
else:
self.pktdump = scapy.all.PcapWriter('flow-undetected-{}.pcap'.format(self.flow_id), append=False, sync=True)
for packet in self.packets:
self.pktdump.write(scapy.all.Raw(packet))
self.pktdump.close()
def parse_json_str(json_str):
try:
j = json.loads(json_str[0])
except json.decoder.JSONDecodeError as exc:
raise RuntimeError('JSON Exception: {}\n\nJSON String: {}\n'.format(str(exc), str(json_str)))
global FLOWS
if 'flow_event_name' in j:
event = j['flow_event_name'].lower()
flow_id = j['flow_id']
if event == 'new':
print('New flow with id {}.'.format(flow_id))
FLOWS[flow_id] = Flow(flow_id)
elif flow_id not in FLOWS:
print('Ignore flow event with id {} as we did not get any flow-new event.'.format(flow_id))
return
elif event == 'end' or event == 'idle':
if event == 'end':
print('End flow with id {}.'.format(flow_id))
elif event == 'idle':
print('Idle flow with id {}.'.format(flow_id))
FLOWS[flow_id].fin()
del FLOWS[flow_id]
elif event == 'detected':
FLOWS[flow_id].detected()
elif event == 'guessed' or event == 'not-detected':
if event == 'guessed':
print('Guessed flow with id {}.'.format(flow_id))
else:
print('Not-detected flow with id {}.'.format(flow_id))
else:
raise RuntimeError('unknown flow event name: {}'.format(event))
elif 'packet_event_name' in j:
buffer_decoded = base64.b64decode(j['pkt'], validate=True)
if j['packet_event_name'] == 'packet-flow':
flow_id = j['flow_id']
if flow_id not in FLOWS:
print('Ignore packet-flow event with id {} as we did not get any flow-new event.'.format(flow_id))
return
FLOWS[flow_id].addPacket(buffer_decoded)
if j['packet_event_name'] == 'packet':
flow = Flow()
flow.addPacket(buffer_decoded)
if __name__ == '__main__':
host = HOST
port = PORT
if len(sys.argv) == 1:
sys.stderr.write('usage: {} [host] [port]\n'.format(sys.argv[0]))
if len(sys.argv) > 1:
host = sys.argv[1]
if len(sys.argv) > 2:
port = int(sys.argv[2])
sys.stderr.write('Recv buffer size: {}\n'.format(NETWORK_BUFFER_MAX_SIZE))
sys.stderr.write('Connecting to {}:{} ..\n'.format(host, port))
nsock = nDPIsrvdSocket()
nsock.connect(host, port)
while True:
received = nsock.receive()
for received_json_pkt in received:
parse_json_str(received_json_pkt)
|