summaryrefslogtreecommitdiff
path: root/examples/py-flow-muliprocess/py-flow-multiprocess.py
diff options
context:
space:
mode:
Diffstat (limited to 'examples/py-flow-muliprocess/py-flow-multiprocess.py')
-rwxr-xr-xexamples/py-flow-muliprocess/py-flow-multiprocess.py37
1 files changed, 26 insertions, 11 deletions
diff --git a/examples/py-flow-muliprocess/py-flow-multiprocess.py b/examples/py-flow-muliprocess/py-flow-multiprocess.py
index 91bc693bc..b90ab536d 100755
--- a/examples/py-flow-muliprocess/py-flow-multiprocess.py
+++ b/examples/py-flow-muliprocess/py-flow-multiprocess.py
@@ -19,28 +19,41 @@ def mp_worker(unused, shared_flow_dict):
import time
while True:
s = str()
+ n = int()
+
for key in shared_flow_dict.keys():
- s += '{}, '.format(str(key))
+ try:
+ flow = shared_flow_dict[key]
+ except KeyError:
+ continue
+
+ s += '{}, '.format(str(flow.flow_id))
+ n += 1
+
if len(s) == 0:
s = '-'
else:
s = s[:-2]
- print('Flows: {}'.format(s))
+
+ print('Flows({}): {}'.format(n, s))
time.sleep(1)
-def nDPIsrvd_worker_onJsonLineRecvd(json_dict, current_flow, global_user_data):
+def nDPIsrvd_worker_onFlowCleanup(instance, current_flow, global_user_data):
+ shared_flow_dict = global_user_data
+
+ del shared_flow_dict[current_flow.flow_id]
+
+ return True
+
+
+def nDPIsrvd_worker_onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
shared_flow_dict = global_user_data
- if 'flow_event_name' not in json_dict:
+ if 'flow_id' not in json_dict:
return True
- if json_dict['flow_event_name'] == 'new':
- shared_flow_dict[json_dict['flow_id']] = current_flow
- elif json_dict['flow_event_name'] == 'idle' or \
- json_dict['flow_event_name'] == 'end':
- if json_dict['flow_id'] in shared_flow_dict:
- del shared_flow_dict[json_dict['flow_id']]
+ shared_flow_dict[current_flow.flow_id] = current_flow
return True
@@ -54,7 +67,9 @@ def nDPIsrvd_worker(address, shared_flow_dict):
nsock = nDPIsrvdSocket()
nsock.connect(address)
- nsock.loop(nDPIsrvd_worker_onJsonLineRecvd, shared_flow_dict)
+ nsock.loop(nDPIsrvd_worker_onJsonLineRecvd,
+ nDPIsrvd_worker_onFlowCleanup,
+ shared_flow_dict)
if __name__ == '__main__':