summaryrefslogtreecommitdiff
path: root/examples/py-flow-muliprocess/py-flow-multiprocess.py
diff options
context:
space:
mode:
Diffstat (limited to 'examples/py-flow-muliprocess/py-flow-multiprocess.py')
-rwxr-xr-xexamples/py-flow-muliprocess/py-flow-multiprocess.py91
1 files changed, 91 insertions, 0 deletions
diff --git a/examples/py-flow-muliprocess/py-flow-multiprocess.py b/examples/py-flow-muliprocess/py-flow-multiprocess.py
new file mode 100755
index 000000000..3313b156b
--- /dev/null
+++ b/examples/py-flow-muliprocess/py-flow-multiprocess.py
@@ -0,0 +1,91 @@
+#!/usr/bin/env python3
+
+import multiprocessing
+import os
+import sys
+
+sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
+sys.path.append(os.path.dirname(sys.argv[0]) + '/../share/nDPId')
+sys.path.append(os.path.dirname(sys.argv[0]))
+sys.path.append(sys.base_prefix + '/share/nDPId')
+import nDPIsrvd
+from nDPIsrvd import nDPIsrvdSocket
+
+def mp_worker(unused, shared_flow_dict):
+ import time
+ while True:
+ s = str()
+ n = int()
+
+ for key in shared_flow_dict.keys():
+ try:
+ flow = shared_flow_dict[key]
+ except KeyError:
+ continue
+
+ s += '{}, '.format(str(flow.flow_id))
+ n += 1
+
+ if len(s) == 0:
+ s = '-'
+ else:
+ s = s[:-2]
+
+ print('Flows({}): {}'.format(n, s))
+ time.sleep(1)
+
+
+def nDPIsrvd_worker_onFlowCleanup(instance, current_flow, global_user_data):
+ shared_flow_dict = global_user_data
+
+ del shared_flow_dict[current_flow.flow_id]
+
+ return True
+
+
+def nDPIsrvd_worker_onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
+ shared_flow_dict = global_user_data
+
+ if 'flow_id' not in json_dict:
+ return True
+
+ shared_flow_dict[current_flow.flow_id] = current_flow
+
+ return True
+
+
+def nDPIsrvd_worker(address, shared_flow_dict):
+ sys.stderr.write('Recv buffer size: {}\n'.format(
+ nDPIsrvd.NETWORK_BUFFER_MAX_SIZE))
+ sys.stderr.write('Connecting to {} ..\n'.format(
+ address[0] + ':' +
+ str(address[1]) if type(address) is tuple else address))
+
+ nsock = nDPIsrvdSocket()
+ nsock.connect(address)
+ nsock.loop(nDPIsrvd_worker_onJsonLineRecvd,
+ nDPIsrvd_worker_onFlowCleanup,
+ shared_flow_dict)
+
+
+if __name__ == '__main__':
+ argparser = nDPIsrvd.defaultArgumentParser()
+ args = argparser.parse_args()
+ address = nDPIsrvd.validateAddress(args)
+
+ mgr = multiprocessing.Manager()
+ shared_flow_dict = mgr.dict()
+
+ nDPIsrvd_job = multiprocessing.Process(
+ target=nDPIsrvd_worker,
+ args=(address, shared_flow_dict))
+ nDPIsrvd_job.start()
+
+ mp_job = multiprocessing.Process(
+ target=mp_worker,
+ args=(None, shared_flow_dict))
+ mp_job.start()
+
+ nDPIsrvd_job.join()
+ mp_job.terminate()
+ mp_job.join()