summaryrefslogtreecommitdiff
path: root/examples/py-flow-muliprocess
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2021-12-15 23:25:32 +0100
committerToni Uhlig <matzeton@googlemail.com>2022-01-20 00:50:38 +0100
commit9e07a57566cc45bf92a845d8cee968d72e0f314e (patch)
tree8f1a6bfd08bd68a5253fadf3a01beecda77b1c95 /examples/py-flow-muliprocess
parenta35fc1d5ea8570609cc0c8cf6edadc81f8f5bb76 (diff)
Major nDPId extension. Sorry for the huge commit.
- nDPId: fixed invalid IP4/IP6 tuple compare - nDPIsrvd: fixed caching issue (finally) - added tiny c example (can be used to check flow manager sanity) - c-captured: use flow_last_seen timestamp from `struct nDPIsrvd_flow` - README.md update: added example JSON sequence - nDPId: added new flow event `update` necessary for correct timeout handling (and other future use-cases) - nDPIsrvd.h and nDPIsrvd.py: switched to an instance (consists of an alias/source tuple) based flow manager - every flow related event **must** now serialize `alias`, `source`, `flow_id`, `flow_last_seen` and `flow_idle_time` to make the timeout handling and verification process work correctly - nDPIsrvd.h: ability to profile any dynamic memory (de-)allocation - nDPIsrvd.py: removed PcapPacket class (unused) - py-flow-dashboard and py-flow-multiprocess: fixed race condition - py-flow-info: print statusbar with probably useful information - nDPId/nDPIsrvd.h: switched from packet-flow only timestamps (`pkt_*sec`) to a generic flow event timestamp `ts_msec` - nDPId-test: added additional checks - nDPId: increased ICMP flow timeout - nDPId: using event based i/o if capturing packets from a device - nDPIsrvd: fixed memory leak on shutdown if remote descriptors were still connected Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'examples/py-flow-muliprocess')
1 files changed, 26 insertions, 11 deletions
diff --git a/examples/py-flow-muliprocess/py-flow-multiprocess.py b/examples/py-flow-muliprocess/py-flow-multiprocess.py
index 91bc693bc..b90ab536d 100755
--- a/examples/py-flow-muliprocess/py-flow-multiprocess.py
+++ b/examples/py-flow-muliprocess/py-flow-multiprocess.py
@@ -19,28 +19,41 @@ def mp_worker(unused, shared_flow_dict):
import time
while True:
s = str()
+ n = int()
+
for key in shared_flow_dict.keys():
- s += '{}, '.format(str(key))
+ try:
+ flow = shared_flow_dict[key]
+ except KeyError:
+ continue
+
+ s += '{}, '.format(str(flow.flow_id))
+ n += 1
+
if len(s) == 0:
s = '-'
else:
s = s[:-2]
- print('Flows: {}'.format(s))
+
+ print('Flows({}): {}'.format(n, s))
time.sleep(1)
-def nDPIsrvd_worker_onJsonLineRecvd(json_dict, current_flow, global_user_data):
+def nDPIsrvd_worker_onFlowCleanup(instance, current_flow, global_user_data):
+ shared_flow_dict = global_user_data
+
+ del shared_flow_dict[current_flow.flow_id]
+
+ return True
+
+
+def nDPIsrvd_worker_onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
shared_flow_dict = global_user_data
- if 'flow_event_name' not in json_dict:
+ if 'flow_id' not in json_dict:
return True
- if json_dict['flow_event_name'] == 'new':
- shared_flow_dict[json_dict['flow_id']] = current_flow
- elif json_dict['flow_event_name'] == 'idle' or \
- json_dict['flow_event_name'] == 'end':
- if json_dict['flow_id'] in shared_flow_dict:
- del shared_flow_dict[json_dict['flow_id']]
+ shared_flow_dict[current_flow.flow_id] = current_flow
return True
@@ -54,7 +67,9 @@ def nDPIsrvd_worker(address, shared_flow_dict):
nsock = nDPIsrvdSocket()
nsock.connect(address)
- nsock.loop(nDPIsrvd_worker_onJsonLineRecvd, shared_flow_dict)
+ nsock.loop(nDPIsrvd_worker_onJsonLineRecvd,
+ nDPIsrvd_worker_onFlowCleanup,
+ shared_flow_dict)
if __name__ == '__main__':