diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2021-05-26 17:12:45 +0200 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2021-05-26 17:18:20 +0200 |
commit | e3d1a8a772551ed0a4912a1413abafd02cfe12cd (patch) | |
tree | 4917c212b4fe56e620760b86e6403a19cd1cf333 /examples | |
parent | 4b6ead68a1c4b1df9f755466277cbb92e6b2e0f7 (diff) |
Added simple Python Multiprocess example.
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'examples')
-rw-r--r-- | examples/README.md | 4 | ||||
-rw-r--r-- | examples/py-flow-muliprocess/py-flow-multiprocess.py | 80 |
2 files changed, 84 insertions, 0 deletions
diff --git a/examples/README.md b/examples/README.md index 80d4464dd..22ff8874b 100644 --- a/examples/README.md +++ b/examples/README.md @@ -25,6 +25,10 @@ A discontinued tty UI nDPId dashboard. I've figured out that Go + UI is a bad id Prints prettyfied information about flow events. +## py-flow-multiprocess + +Simple Python Multiprocess example spawning two worker processes, one connecting to nDPIsrvd and one printing flow id's to STDOUT. + ## py-flow-undetected-to-pcap Captures and saves undetected flows to a PCAP file. diff --git a/examples/py-flow-muliprocess/py-flow-multiprocess.py b/examples/py-flow-muliprocess/py-flow-multiprocess.py new file mode 100644 index 000000000..91bc693bc --- /dev/null +++ b/examples/py-flow-muliprocess/py-flow-multiprocess.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 + +import multiprocessing +import os +import sys + +sys.path.append(os.path.dirname(sys.argv[0]) + '/../share/nDPId') +sys.path.append(os.path.dirname(sys.argv[0]) + '/../usr/share/nDPId') +try: + import nDPIsrvd + from nDPIsrvd import nDPIsrvdSocket +except ImportError: + sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies') + import nDPIsrvd + from nDPIsrvd import nDPIsrvdSocket + + +def mp_worker(unused, shared_flow_dict): + import time + while True: + s = str() + for key in shared_flow_dict.keys(): + s += '{}, '.format(str(key)) + if len(s) == 0: + s = '-' + else: + s = s[:-2] + print('Flows: {}'.format(s)) + time.sleep(1) + + +def nDPIsrvd_worker_onJsonLineRecvd(json_dict, current_flow, global_user_data): + shared_flow_dict = global_user_data + + if 'flow_event_name' not in json_dict: + return True + + if json_dict['flow_event_name'] == 'new': + shared_flow_dict[json_dict['flow_id']] = current_flow + elif json_dict['flow_event_name'] == 'idle' or \ + json_dict['flow_event_name'] == 'end': + if json_dict['flow_id'] in shared_flow_dict: + del shared_flow_dict[json_dict['flow_id']] + + return True + + +def nDPIsrvd_worker(address, shared_flow_dict): + sys.stderr.write('Recv buffer size: {}\n'.format( + nDPIsrvd.NETWORK_BUFFER_MAX_SIZE)) + sys.stderr.write('Connecting to {} ..\n'.format( + address[0] + ':' + + str(address[1]) if type(address) is tuple else address)) + + nsock = nDPIsrvdSocket() + nsock.connect(address) + nsock.loop(nDPIsrvd_worker_onJsonLineRecvd, shared_flow_dict) + + +if __name__ == '__main__': + argparser = nDPIsrvd.defaultArgumentParser() + args = argparser.parse_args() + address = nDPIsrvd.validateAddress(args) + + mgr = multiprocessing.Manager() + shared_flow_dict = mgr.dict() + + nDPIsrvd_job = multiprocessing.Process( + target=nDPIsrvd_worker, + args=(address, shared_flow_dict)) + nDPIsrvd_job.start() + + mp_job = multiprocessing.Process( + target=mp_worker, + args=(None, shared_flow_dict)) + mp_job.start() + + nDPIsrvd_job.join() + mp_job.terminate() + mp_job.join() |