aboutsummaryrefslogtreecommitdiff
path: root/examples/py-flow-dashboard/flow-dash.py
blob: 2bf95af4209bece3763db7356014ea2af5653f66 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/usr/bin/env python3

from collections import deque
import dash
from dash.dependencies import Output, Input
import dash_core_components as dcc
import dash_html_components as html
import multiprocessing
import os
import plotly
import plotly.graph_objs as go
import sys

sys.path.append(os.path.dirname(sys.argv[0]) + '/../share/nDPId')
sys.path.append(os.path.dirname(sys.argv[0]) + '/../usr/share/nDPId')
try:
    import nDPIsrvd
    from nDPIsrvd import nDPIsrvdSocket
except ImportError:
    sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
    import nDPIsrvd
    from nDPIsrvd import nDPIsrvdSocket

mgr = multiprocessing.Manager()

global shared_flow_dict
shared_flow_dict = mgr.dict()

FLOW_COUNT_DATAPOINTS = 50

global live_flow_count_X
live_flow_count_X = deque(maxlen=FLOW_COUNT_DATAPOINTS)
live_flow_count_X.append(1)
global live_flow_count_Y
live_flow_count_Y = deque(maxlen=FLOW_COUNT_DATAPOINTS)
live_flow_count_Y.append(1)

live_flow_bars = ['risky', 'midstream', 'detected', 'guessed', 'not-detected']
fig = go.Figure()

app = dash.Dash(__name__)
app.layout = html.Div(
    [
        dcc.Graph(id='live-flow-count', animate=True),
        dcc.Graph(id='live-flow-bars',  animate=True, figure=fig),
        dcc.Interval(
            id='graph-update',
            interval=1000,
            n_intervals=0
        ),
    ]
)


@app.callback(
    Output('live-flow-count', 'figure'),
    [Input('graph-update', 'n_intervals')]
)
def update_graph_scatter(n):
    live_flow_count_X.append(live_flow_count_X[-1]+1)
    live_flow_count_Y.append(len(shared_flow_dict))

    data = plotly.graph_objs.Scatter(
        x=list(live_flow_count_X),
        y=list(live_flow_count_Y),
        name='Scatter',
        mode='lines+markers'
    )

    return {
            'data': [data],
            'layout':
            go.Layout(
                xaxis=dict(
                    range=[min(live_flow_count_X), max(live_flow_count_X)]
                ),
                yaxis=dict(
                    range=[min(live_flow_count_Y), max(live_flow_count_Y)]
                ),
            )}


@app.callback(
    Output('live-flow-bars', 'figure'),
    [Input('graph-update', 'n_intervals')]
)
def update_pie(n):
    values = [0, 0, 0, 0, 0]

    for flow_id in shared_flow_dict.keys():

        if shared_flow_dict[flow_id]['is_risky'] is True:
            values[0] += 1

        if shared_flow_dict[flow_id]['is_midstream'] is True:
            values[1] += 1

        if shared_flow_dict[flow_id]['is_detected'] is True:
            values[2] += 1

        if shared_flow_dict[flow_id]['is_guessed'] is True:
            values[3] += 1

        if shared_flow_dict[flow_id]['is_not_detected'] is True:
            values[4] += 1

        if shared_flow_dict[flow_id]['remove_me'] is True:
            del shared_flow_dict[flow_id]

    # print(values)

    return {
            'data': [
                        go.Bar(name='', x=live_flow_bars, y=values)
                    ],
            'layout': go.Layout(yaxis=dict(range=[0, max(values)]))
           }


def web_worker():
    app.run_server()


def nDPIsrvd_worker_onJsonLineRecvd(json_dict, current_flow, global_user_data):
    if 'flow_event_name' not in json_dict:
        return True

    # print(json_dict)

    if json_dict['flow_id'] not in shared_flow_dict:
        shared_flow_dict[json_dict['flow_id']] = mgr.dict()
        shared_flow_dict[json_dict['flow_id']]['is_detected'] = False
        shared_flow_dict[json_dict['flow_id']]['is_guessed'] = False
        shared_flow_dict[json_dict['flow_id']]['is_not_detected'] = False
        shared_flow_dict[json_dict['flow_id']]['is_midstream'] = False
        shared_flow_dict[json_dict['flow_id']]['is_risky'] = False
        shared_flow_dict[json_dict['flow_id']]['remove_me'] = False

    if json_dict['flow_event_name'] == 'new':
        if 'midstream' in json_dict and json_dict['midstream'] != 0:
            shared_flow_dict[json_dict['flow_id']]['is_midstream'] = True
    elif json_dict['flow_event_name'] == 'guessed':
        shared_flow_dict[json_dict['flow_id']]['is_guessed'] = True
    elif json_dict['flow_event_name'] == 'not-detected':
        shared_flow_dict[json_dict['flow_id']]['is_not_detected'] = True
    elif json_dict['flow_event_name'] == 'detected':
        shared_flow_dict[json_dict['flow_id']]['is_detected'] = True
        shared_flow_dict[json_dict['flow_id']]['is_guessed'] = False
        shared_flow_dict[json_dict['flow_id']]['is_not_detected'] = False
        if 'ndpi' in json_dict and 'flow_risk' in json_dict['ndpi']:
            shared_flow_dict[json_dict['flow_id']]['is_risky'] = True
    elif json_dict['flow_event_name'] == 'idle' or \
            json_dict['flow_event_name'] == 'end':
        shared_flow_dict[json_dict['flow_id']]['remove_me'] = True

    return True


def nDPIsrvd_worker(address, nDPIsrvd_global_user_data):
    sys.stderr.write('Recv buffer size: {}\n'
                     .format(nDPIsrvd.NETWORK_BUFFER_MAX_SIZE))
    sys.stderr.write('Connecting to {} ..\n'
                     .format(address[0]+':'+str(address[1])
                             if type(address) is tuple else address))

    nsock = nDPIsrvdSocket()
    nsock.connect(address)
    nsock.loop(nDPIsrvd_worker_onJsonLineRecvd, nDPIsrvd_global_user_data)


if __name__ == '__main__':
    argparser = nDPIsrvd.defaultArgumentParser()
    args = argparser.parse_args()
    address = nDPIsrvd.validateAddress(args)

    nDPIsrvd_job = multiprocessing.Process(target=nDPIsrvd_worker,
                                           args=(address, None))
    nDPIsrvd_job.start()

    web_job = multiprocessing.Process(target=web_worker, args=())
    web_job.start()

    nDPIsrvd_job.join()
    web_job.terminate()
    web_job.join()