diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2022-03-06 17:31:26 +0100 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2022-03-06 17:38:05 +0100 |
commit | 46f68501d575431656b5254a4bda8acb2982ab77 (patch) | |
tree | 030c68ea408f61de131b93a51b1394648c4a7b85 /examples | |
parent | 9db048c9d93a00adf4b258d2341b24229d2a45a1 (diff) |
Added daemon event: DAEMON_EVENT_STATUS (periodically send's daemon statistics.)
* Improved distributor timeout handling (per-thread).
* flow-info.py / flow-dash.py: Distinguish between flow risk severities.
* nDPId: Skip tag switch datalink packet dissection / processing.
* nDPId: Fixed incorrect value for current active flows.
* Improved JSON schema's.
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'examples')
-rw-r--r-- | examples/c-captured/c-captured.c | 13 | ||||
-rwxr-xr-x | examples/py-flow-dashboard/flow-dash.py | 57 | ||||
-rw-r--r-- | examples/py-flow-dashboard/plotly_dash.py | 11 | ||||
-rwxr-xr-x | examples/py-flow-info/flow-info.py | 104 | ||||
-rwxr-xr-x | examples/py-semantic-validation/py-semantic-validation.py | 92 |
5 files changed, 173 insertions, 104 deletions
diff --git a/examples/c-captured/c-captured.c b/examples/c-captured/c-captured.c index 3437091f1..f8b716c53 100644 --- a/examples/c-captured/c-captured.c +++ b/examples/c-captured/c-captured.c @@ -370,8 +370,8 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock return CALLBACK_ERROR; } - nDPIsrvd_ull ts_msec = 0ull; - perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "ts_msec"), &ts_msec), "ts_msec"); + nDPIsrvd_ull thread_ts_msec = 0ull; + perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "thread_ts_msec"), &thread_ts_msec), "thread_ts_msec"); nDPIsrvd_ull pkt_len = 0ull; perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "pkt_len"), &pkt_len), "pkt_len"); @@ -382,8 +382,8 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock nDPIsrvd_ull pkt_l4_offset = 0ull; perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "pkt_l4_offset"), &pkt_l4_offset), "pkt_l4_offset"); - struct packet_data pd = {.packet_ts_sec = ts_msec / 1000, - .packet_ts_usec = (ts_msec % 1000) * 1000, + struct packet_data pd = {.packet_ts_sec = thread_ts_msec / 1000, + .packet_ts_usec = (thread_ts_msec % 1000) * 1000, .packet_len = pkt_len, .base64_packet_size = pkt->value_length, .base64_packet_const = pkt->value}; @@ -437,10 +437,11 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock { while ((current = token_get_next_child(sock, flow_risk, &next_child_index)) != NULL) { - nDPIsrvd_ull numeric_risk_value = 0ull; + nDPIsrvd_ull numeric_risk_value = (nDPIsrvd_ull)-1; if (TOKEN_KEY_TO_ULL(current, &numeric_risk_value) == CONVERSION_OK && - numeric_risk_value < NDPI_MAX_RISK && has_ndpi_risk(&process_risky, numeric_risk_value) != 0) + numeric_risk_value < NDPI_MAX_RISK && + has_ndpi_risk(&process_risky, numeric_risk_value) != 0) { flow_user->risky = 1; } diff --git a/examples/py-flow-dashboard/flow-dash.py b/examples/py-flow-dashboard/flow-dash.py index 411029398..d6eb54bdf 100755 --- a/examples/py-flow-dashboard/flow-dash.py +++ b/examples/py-flow-dashboard/flow-dash.py @@ -11,6 +11,11 @@ import nDPIsrvd from nDPIsrvd import nDPIsrvdSocket import plotly_dash +FLOW_RISK_SEVERE = 4 +FLOW_RISK_HIGH = 3 +FLOW_RISK_MEDIUM = 2 +FLOW_RISK_LOW = 1 + def nDPIsrvd_worker_onFlowCleanup(instance, current_flow, global_user_data): _, shared_flow_dict = global_user_data @@ -35,9 +40,18 @@ def nDPIsrvd_worker_onFlowCleanup(instance, current_flow, global_user_data): if shared_flow_dict[flow_id]['is_midstream'] is True: shared_flow_dict['current-midstream-flows'] -= 1 - if shared_flow_dict[flow_id]['is_risky'] is True: + if shared_flow_dict[flow_id]['is_risky'] > 0: shared_flow_dict['current-risky-flows'] -= 1 + if shared_flow_dict[flow_id]['is_risky'] == FLOW_RISK_LOW: + shared_flow_dict['current-risky-flows-low'] -= 1 + elif shared_flow_dict[flow_id]['is_risky'] == FLOW_RISK_MEDIUM: + shared_flow_dict['current-risky-flows-medium'] -= 1 + elif shared_flow_dict[flow_id]['is_risky'] == FLOW_RISK_HIGH: + shared_flow_dict['current-risky-flows-high'] -= 1 + elif shared_flow_dict[flow_id]['is_risky'] == FLOW_RISK_SEVERE: + shared_flow_dict['current-risky-flows-severe'] -= 1 + del shared_flow_dict[current_flow.flow_id] return True @@ -72,7 +86,7 @@ def nDPIsrvd_worker_onJsonLineRecvd(json_dict, instance, current_flow, global_us shared_flow_dict[flow_id]['is_guessed'] = False shared_flow_dict[flow_id]['is_not_detected'] = False shared_flow_dict[flow_id]['is_midstream'] = False - shared_flow_dict[flow_id]['is_risky'] = False + shared_flow_dict[flow_id]['is_risky'] = 0 shared_flow_dict[flow_id]['total-l4-bytes'] = 0 shared_flow_dict[flow_id]['json'] = mgr.dict() @@ -93,11 +107,33 @@ def nDPIsrvd_worker_onJsonLineRecvd(json_dict, instance, current_flow, global_us # XXX: Will make use of that JSON string in Plotly. Soon.. shared_flow_dict[flow_id]['json']['ndpi'] = json_dict['ndpi'] - if 'flow_risk' in json_dict['ndpi']: - if shared_flow_dict[flow_id]['is_risky'] is False: - shared_flow_dict['total-risky-flows'] += 1 - shared_flow_dict['current-risky-flows'] += 1 - shared_flow_dict[flow_id]['is_risky'] = True + if 'flow_risk' in json_dict['ndpi'] and shared_flow_dict[flow_id]['is_risky'] == 0: + shared_flow_dict['total-risky-flows'] += 1 + shared_flow_dict['current-risky-flows'] += 1 + + severity = shared_flow_dict[flow_id]['is_risky'] + for key in json_dict['ndpi']['flow_risk']: + if json_dict['ndpi']['flow_risk'][key]['severity'] == 'Low': + severity = max(severity, FLOW_RISK_LOW) + elif json_dict['ndpi']['flow_risk'][key]['severity'] == 'Medium': + severity = max(severity, FLOW_RISK_MEDIUM) + elif json_dict['ndpi']['flow_risk'][key]['severity'] == 'High': + severity = max(severity, FLOW_RISK_HIGH) + elif json_dict['ndpi']['flow_risk'][key]['severity'] == 'Severe': + severity = max(severity, FLOW_RISK_SEVERE) + else: + raise RuntimeError('Invalid flow risk severity: {}'.format( + json_dict['ndpi']['flow_risk'][key]['severity'])) + shared_flow_dict[flow_id]['is_risky'] = severity + + if shared_flow_dict[flow_id]['is_risky'] == FLOW_RISK_LOW: + shared_flow_dict['current-risky-flows-low'] += 1 + elif shared_flow_dict[flow_id]['is_risky'] == FLOW_RISK_MEDIUM: + shared_flow_dict['current-risky-flows-medium'] += 1 + elif shared_flow_dict[flow_id]['is_risky'] == FLOW_RISK_HIGH: + shared_flow_dict['current-risky-flows-high'] += 1 + elif shared_flow_dict[flow_id]['is_risky'] == FLOW_RISK_SEVERE: + shared_flow_dict['current-risky-flows-severe'] += 1 if 'flow_event_name' not in json_dict: return True @@ -221,11 +257,16 @@ if __name__ == '__main__': shared_flow_dict['current-flows'] = 0 shared_flow_dict['current-detected-flows'] = 0 - shared_flow_dict['current-risky-flows'] = 0 shared_flow_dict['current-midstream-flows'] = 0 shared_flow_dict['current-guessed-flows'] = 0 shared_flow_dict['current-not-detected-flows'] = 0 + shared_flow_dict['current-risky-flows'] = 0 + shared_flow_dict['current-risky-flows-severe'] = 0 + shared_flow_dict['current-risky-flows-high'] = 0 + shared_flow_dict['current-risky-flows-medium'] = 0 + shared_flow_dict['current-risky-flows-low'] = 0 + nDPIsrvd_job = multiprocessing.Process(target=nDPIsrvd_worker, args=(address, shared_flow_dict)) nDPIsrvd_job.start() diff --git a/examples/py-flow-dashboard/plotly_dash.py b/examples/py-flow-dashboard/plotly_dash.py index 3577776cf..adc103afb 100644 --- a/examples/py-flow-dashboard/plotly_dash.py +++ b/examples/py-flow-dashboard/plotly_dash.py @@ -89,7 +89,7 @@ def build_piechart(labels, values, color_map=None): COLOR_MAP = { 'piechart-flows': ['rgb(153, 153, 255)', 'rgb(153, 204, 255)', 'rgb(255, 204, 153)', 'rgb(255, 255, 255)'], 'piechart-midstream-flows': ['rgb(255, 255, 153)', 'rgb(153, 153, 255)'], - 'piechart-risky-flows': ['rgb(255, 153, 153)', 'rgb(153, 153, 255)'], + 'piechart-risky-flows': ['rgb(255, 0, 0)', 'rgb(255, 128, 0)', 'rgb(255, 255, 0)', 'rgb(128, 255, 0)', 'rgb(153, 153, 255)'], 'graph-flows': {'Current Active Flows': {'color': 'rgb(153, 153, 255)', 'width': 1}, 'Current Risky Flows': {'color': 'rgb(255, 153, 153)', 'width': 3}, 'Current Midstream Flows': {'color': 'rgb(255, 255, 153)', 'width': 3}, @@ -150,7 +150,7 @@ def generate_tab_flow(): config={ 'displayModeBar': False, }, - figure=build_piechart(['Risky', 'Not Risky'], + figure=build_piechart(['Severy Risk', 'High Risk', 'Medium Risk', 'Low Risk', 'No Risk'], [0, 0], COLOR_MAP['piechart-risky-flows']), ), ], style={'padding': 10, 'flex': 1}), @@ -257,8 +257,11 @@ def tab_flow_update_components(n): shared_flow_dict['current-flows'] - shared_flow_dict['current-midstream-flows']], COLOR_MAP['piechart-midstream-flows']), - build_piechart(['Risky', 'Not Risky'], - [shared_flow_dict['current-risky-flows'], + build_piechart(['Severe', 'High', 'Medium', 'Low', 'No Risk'], + [shared_flow_dict['current-risky-flows-severe'], + shared_flow_dict['current-risky-flows-high'], + shared_flow_dict['current-risky-flows-medium'], + shared_flow_dict['current-risky-flows-low'], shared_flow_dict['current-flows'] - shared_flow_dict['current-risky-flows']], COLOR_MAP['piechart-risky-flows'])] diff --git a/examples/py-flow-info/flow-info.py b/examples/py-flow-info/flow-info.py index 06bbf83fb..7df843ccb 100755 --- a/examples/py-flow-info/flow-info.py +++ b/examples/py-flow-info/flow-info.py @@ -229,18 +229,57 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): instance_and_source = '' if args.hide_instance_info is False: - instance_and_source += '[{}]'.format(TermColor.setColorByString(instance.alias)) - instance_and_source += '[{}] '.format(TermColor.setColorByString(instance.source)) + instance_and_source += '[{}][{}][{:.>2}] '.format( + TermColor.setColorByString(instance.alias), + TermColor.setColorByString(instance.source), + json_dict['thread_id']) + else: + instance_and_source += ' ' + + basic_daemon_event_prefix = '' + timestamp = '' + if args.print_timestamp is True: + if 'thread_ts_msec' in json_dict: + timestamp += '[{}]'.format(time.strftime('%H:%M:%S', + time.localtime(json_dict['thread_ts_msec'] / 1000))) + elif 'global_ts_msec' in json_dict: + timestamp += '[{}]'.format(time.strftime('%H:%M:%S', + time.localtime(json_dict['global_ts_msec'] / 1000))) + + first_seen = '' + if args.print_first_seen is True: + basic_daemon_event_prefix += ' ' * 11 + if 'flow_first_seen' in json_dict: + first_seen = '[' + prettifyTimediff(json_dict['flow_first_seen'] / 1000, + json_dict['thread_ts_msec'] / 1000) + ']' + + last_seen = '' + if args.print_last_seen is True: + basic_daemon_event_prefix += ' ' * 11 + if 'flow_last_seen' in json_dict: + last_seen = '[' + prettifyTimediff(json_dict['flow_last_seen'] / 1000, + json_dict['thread_ts_msec'] / 1000) + ']' if 'daemon_event_id' in json_dict: - print('{} {}: {}'.format(instance_and_source, - prettifyEvent([TermColor.WARNING, TermColor.BLINK], 16, 'DAEMON-EVENT'), - json_dict['daemon_event_name'])) + if json_dict['daemon_event_name'] == 'status': + color = [TermColor.WARNING] + daemon_msg = '[Processed: {} pkts][Flows][active: {} / {}|skipped: {}|!detected: {}' \ + '|guessed: {}|detection-updates: {}|updates: {}]'.format( + json_dict['packets-processed'], + json_dict['current-active-flows'], json_dict['total-active-flows'], + json_dict['total-skipped-flows'], + json_dict['total-not-detected-flows'], json_dict['total-guessed-flows'], + json_dict['total-detection-updates'], json_dict['total-updates']) + else: + color = [TermColor.WARNING, TermColor.BLINK] + daemon_msg = json_dict['daemon_event_name'] + print('{}{}{} {}: {}'.format(timestamp, basic_daemon_event_prefix, instance_and_source, + prettifyEvent(color, 15, 'DAEMON-EVENT'), daemon_msg)) stats.printStatus() return True if 'basic_event_id' in json_dict: - print('{} {}: {}'.format(instance_and_source, - prettifyEvent([TermColor.FAIL, TermColor.BLINK], 16, 'BASIC-EVENT'), + print('{}{}{} {}: {}'.format(timestamp, basic_daemon_event_prefix, instance_and_source, + prettifyEvent([TermColor.FAIL, TermColor.BLINK], 15, 'BASIC-EVENT'), json_dict['basic_event_name'])) stats.printStatus() return True @@ -252,28 +291,6 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): stats.printStatus() return True - timestamp = '' - if args.print_timestamp is True and 'ts_msec' in json_dict: - timestamp += '[{}]'.format(time.strftime('%H:%M:%S', - time.localtime(json_dict['ts_msec'] / 1000))) - - first_seen = '' - if args.print_first_seen is True and 'flow_first_seen' in json_dict: - first_seen = '[' + prettifyTimediff(json_dict['flow_first_seen'] / 1000, - json_dict['ts_msec'] / 1000) + ']' - - last_seen = '' - if args.print_last_seen is True and 'flow_last_seen' in json_dict: - last_seen = '[' + prettifyTimediff(json_dict['flow_last_seen'] / 1000, - json_dict['ts_msec'] / 1000) + ']' - - if len(last_seen) > 0: - last_seen += ' ' - elif len(timestamp) > 0: - timestamp += ' ' - elif len(first_seen) > 0: - first_seen += ' ' - ndpi_proto_categ_breed = '' ndpi_frisk = '' @@ -288,14 +305,33 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): ndpi_proto_categ_breed += '[' + str(json_dict['ndpi']['breed']) + ']' if 'flow_risk' in json_dict['ndpi']: + severity = 0 cnt = 0 + for key in json_dict['ndpi']['flow_risk']: - ndpi_frisk += str(json_dict['ndpi']['flow_risk'][key]) + ', ' + ndpi_frisk += str(json_dict['ndpi']['flow_risk'][key]['risk']) + ', ' + if json_dict['ndpi']['flow_risk'][key]['severity'] == 'Low': + severity = max(severity, 1) + elif json_dict['ndpi']['flow_risk'][key]['severity'] == 'Medium': + severity = max(severity, 2) + elif json_dict['ndpi']['flow_risk'][key]['severity'] == 'High': + severity = max(severity, 3) + elif json_dict['ndpi']['flow_risk'][key]['severity'] == 'Severe': + severity = max(severity, 4) cnt += 1 - ndpi_frisk = '{}: {}'.format( - TermColor.WARNING + TermColor.BOLD + 'RISK' + TermColor.END if cnt < 2 - else TermColor.FAIL + TermColor.BOLD + TermColor.BLINK + 'RISK' + TermColor.END, - ndpi_frisk[:-2]) + + if severity == 1: + color = TermColor.WARNING + TermColor.BOLD + elif severity == 2: + color = TermColor.WARNING + TermColor.BOLD + TermColor.BLINK + elif severity == 3: + color = TermColor.FAIL + TermColor.BOLD + elif severity == 4: + color = TermColor.FAIL + TermColor.BOLD + TermColor.BLINK + else: + color = '' + + ndpi_frisk = '{}{}{}: {}'.format(color, 'RISK', TermColor.END, ndpi_frisk[:-2]) line_suffix = '' flow_event_name = '' diff --git a/examples/py-semantic-validation/py-semantic-validation.py b/examples/py-semantic-validation/py-semantic-validation.py index 3ca90edcf..bce0355de 100755 --- a/examples/py-semantic-validation/py-semantic-validation.py +++ b/examples/py-semantic-validation/py-semantic-validation.py @@ -21,7 +21,7 @@ class Stats: self.nsock = nDPIsrvd_sock def resetEventCounter(self): - keys = ['init','reconnect','shutdown', \ + keys = ['init','reconnect','shutdown','status', \ 'new','end','idle','update', 'guessed','detected','detection-update','not-detected', \ 'packet', 'packet-flow'] @@ -53,7 +53,7 @@ class Stats: return True def getEventCounterStr(self): - keys = [ [ 'init','reconnect','shutdown' ], \ + keys = [ [ 'init','reconnect','shutdown','status' ], \ [ 'new','end','idle','update' ], \ [ 'guessed','detected','detection-update','not-detected' ], \ [ 'packet', 'packet-flow' ] ] @@ -76,8 +76,6 @@ class SemanticValidationException(Exception): return 'Flow ID {}: {}'.format(self.current_flow.flow_id, self.text) def onFlowCleanup(instance, current_flow, global_user_data): - _, enable_timeout_check, _ = global_user_data - if type(instance) is not nDPIsrvd.Instance: raise SemanticValidationException(current_flow, 'instance is not of type nDPIsrvd.Instance: ' \ @@ -99,37 +97,32 @@ def onFlowCleanup(instance, current_flow, global_user_data): raise SemanticValidationException(current_flow, 'Unexpected flow cleanup reason: CLEANUP_REASON_FLOW_TIMEOUT') - if enable_timeout_check is True: - try: - l4_proto = current_flow.l4_proto - except AttributeError: - l4_proto = 'n/a' - - invalid_flows = stats.nsock.verify() - if len(invalid_flows) > 0: - invalid_flows_str = '' - for flow_id in invalid_flows: - flow = instance.flows[flow_id] - try: - l4_proto = flow.l4_proto - except AttributeError: - l4_proto = 'n/a' - invalid_flows_str += '{} proto[{},{}] ts[{} + {} < {}] diff[{}], '.format(flow_id, l4_proto, flow.flow_idle_time, - flow.flow_last_seen, flow.flow_idle_time, - instance.most_recent_flow_time, - instance.most_recent_flow_time - - (flow.flow_last_seen + flow.flow_idle_time)) - - raise SemanticValidationException(None, 'Flow Manager verification failed for: {}'.format(invalid_flows_str[:-2])) + try: + l4_proto = current_flow.l4_proto + except AttributeError: + l4_proto = 'n/a' - return True + invalid_flows = stats.nsock.verify() + if len(invalid_flows) > 0: + invalid_flows_str = '' + for flow_id in invalid_flows: + flow = instance.flows[flow_id] + try: + l4_proto = flow.l4_proto + except AttributeError: + l4_proto = 'n/a' + invalid_flows_str += '{} proto[{},{}] ts[{} + {} < {}] diff[{}], '.format(flow_id, l4_proto, flow.flow_idle_time, + flow.flow_last_seen, flow.flow_idle_time, + instance.most_recent_flow_time, + instance.most_recent_flow_time - + (flow.flow_last_seen + flow.flow_idle_time)) -class ThreadData(object): - lowest_possible_flow_id = 0 - lowest_possible_packet_id = 0 + raise SemanticValidationException(None, 'Flow Manager verification failed for: {}'.format(invalid_flows_str[:-2])) + + return True def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): - _, _, stats = global_user_data + _, stats = global_user_data stats.incrementEventCounter(json_dict) if type(instance) is not nDPIsrvd.Instance: @@ -149,15 +142,7 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): 'stats is not of type Stats: ' \ '{}'.format(type(stats))) - try: - thread_data_dict = instance.thread_data - except AttributeError: - thread_data_dict = instance.thread_data = dict() - - if json_dict['thread_id'] in thread_data_dict: - td = thread_data_dict[json_dict['thread_id']] - else: - td = thread_data_dict[json_dict['thread_id']] = ThreadData() + td = instance.getThreadDataFromJSON(json_dict) for event_name in ['basic_event_name', 'daemon_event_name', 'packet_event_name', 'flow_event_name']: @@ -165,8 +150,12 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): raise SemanticValidationException(current_flow, 'Received an invalid event for {}'.format(event_name)) - lowest_possible_flow_id = td.lowest_possible_flow_id - lowest_possible_packet_id = td.lowest_possible_packet_id + if td is not None: + lowest_possible_flow_id = getattr(td, 'lowest_possible_flow_id', 0) + lowest_possible_packet_id = getattr(td, 'lowest_possible_packet_id', 0) + else: + lowest_possible_flow_id = 0 + lowest_possible_packet_id = 0 if current_flow is not None: @@ -203,8 +192,8 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): 'both required for timeout handling:' \ 'flow_last_seen, flow_idle_time') - if 'ts_msec' in json_dict: - current_flow.ts_msec = int(json_dict['ts_msec']) + if 'thread_ts_msec' in json_dict: + current_flow.thread_ts_msec = int(json_dict['thread_ts_msec']) if 'flow_packet_id' in json_dict: try: @@ -233,7 +222,8 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): 'got {}'.format(json_dict['thread_id'], lowest_possible_packet_id, json_dict['packet_id'])) - td.lowest_possible_packet_id = lowest_possible_packet_id + if td is not None: + td.lowest_possible_packet_id = lowest_possible_packet_id if 'flow_id' in json_dict: if current_flow.flow_id != json_dict['flow_id']: @@ -275,13 +265,13 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): pass try: - if json_dict['flow_first_seen'] > current_flow.ts_msec or \ - json_dict['flow_last_seen'] > current_flow.ts_msec or \ + if json_dict['flow_first_seen'] > current_flow.thread_ts_msec or \ + json_dict['flow_last_seen'] > current_flow.thread_ts_msec or \ json_dict['flow_first_seen'] > json_dict['flow_last_seen']: raise SemanticValidationException(current_flow, 'Last packet timestamp is invalid: ' \ 'first_seen({}) <= {} >= last_seen({})'.format(json_dict['flow_first_seen'], - current_flow.ts_msec, + current_flow.thread_ts_msec, json_dict['flow_last_seen'])) except AttributeError: if json_dict['flow_event_name'] == 'new': @@ -303,7 +293,7 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): pass current_flow.flow_new_seen = True current_flow.flow_packet_id = 0 - if lowest_possible_flow_id == 0: + if lowest_possible_flow_id == 0 and td is not None: td.lowest_possible_flow_id = current_flow.flow_id elif json_dict['flow_event_name'] == 'detected' or \ json_dict['flow_event_name'] == 'not-detected': @@ -337,8 +327,6 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): if __name__ == '__main__': argparser = nDPIsrvd.defaultArgumentParser() argparser.add_argument('--strict', action='store_true', default=False, help='Require and validate a full nDPId application lifecycle.') - argparser.add_argument('--enable-timeout-check', action='store_true', default=False, - help='Enable additional flow timeout validation. See README.md for more information') args = argparser.parse_args() address = nDPIsrvd.validateAddress(args) @@ -349,7 +337,7 @@ if __name__ == '__main__': nsock.connect(address) stats = Stats(nsock) try: - nsock.loop(onJsonLineRecvd, onFlowCleanup, (args.strict, args.enable_timeout_check, stats)) + nsock.loop(onJsonLineRecvd, onFlowCleanup, (args.strict, stats)) except nDPIsrvd.SocketConnectionBroken as err: sys.stderr.write('\n{}\n'.format(err)) except KeyboardInterrupt: |