summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2022-03-06 17:31:26 +0100
committerToni Uhlig <matzeton@googlemail.com>2022-03-06 17:38:05 +0100
commit46f68501d575431656b5254a4bda8acb2982ab77 (patch)
tree030c68ea408f61de131b93a51b1394648c4a7b85 /examples
parent9db048c9d93a00adf4b258d2341b24229d2a45a1 (diff)
Added daemon event: DAEMON_EVENT_STATUS (periodically send's daemon statistics.)
* Improved distributor timeout handling (per-thread). * flow-info.py / flow-dash.py: Distinguish between flow risk severities. * nDPId: Skip tag switch datalink packet dissection / processing. * nDPId: Fixed incorrect value for current active flows. * Improved JSON schema's. Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'examples')
-rw-r--r--examples/c-captured/c-captured.c13
-rwxr-xr-xexamples/py-flow-dashboard/flow-dash.py57
-rw-r--r--examples/py-flow-dashboard/plotly_dash.py11
-rwxr-xr-xexamples/py-flow-info/flow-info.py104
-rwxr-xr-xexamples/py-semantic-validation/py-semantic-validation.py92
5 files changed, 173 insertions, 104 deletions
diff --git a/examples/c-captured/c-captured.c b/examples/c-captured/c-captured.c
index 3437091f1..f8b716c53 100644
--- a/examples/c-captured/c-captured.c
+++ b/examples/c-captured/c-captured.c
@@ -370,8 +370,8 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock
return CALLBACK_ERROR;
}
- nDPIsrvd_ull ts_msec = 0ull;
- perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "ts_msec"), &ts_msec), "ts_msec");
+ nDPIsrvd_ull thread_ts_msec = 0ull;
+ perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "thread_ts_msec"), &thread_ts_msec), "thread_ts_msec");
nDPIsrvd_ull pkt_len = 0ull;
perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "pkt_len"), &pkt_len), "pkt_len");
@@ -382,8 +382,8 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock
nDPIsrvd_ull pkt_l4_offset = 0ull;
perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "pkt_l4_offset"), &pkt_l4_offset), "pkt_l4_offset");
- struct packet_data pd = {.packet_ts_sec = ts_msec / 1000,
- .packet_ts_usec = (ts_msec % 1000) * 1000,
+ struct packet_data pd = {.packet_ts_sec = thread_ts_msec / 1000,
+ .packet_ts_usec = (thread_ts_msec % 1000) * 1000,
.packet_len = pkt_len,
.base64_packet_size = pkt->value_length,
.base64_packet_const = pkt->value};
@@ -437,10 +437,11 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock
{
while ((current = token_get_next_child(sock, flow_risk, &next_child_index)) != NULL)
{
- nDPIsrvd_ull numeric_risk_value = 0ull;
+ nDPIsrvd_ull numeric_risk_value = (nDPIsrvd_ull)-1;
if (TOKEN_KEY_TO_ULL(current, &numeric_risk_value) == CONVERSION_OK &&
- numeric_risk_value < NDPI_MAX_RISK && has_ndpi_risk(&process_risky, numeric_risk_value) != 0)
+ numeric_risk_value < NDPI_MAX_RISK &&
+ has_ndpi_risk(&process_risky, numeric_risk_value) != 0)
{
flow_user->risky = 1;
}
diff --git a/examples/py-flow-dashboard/flow-dash.py b/examples/py-flow-dashboard/flow-dash.py
index 411029398..d6eb54bdf 100755
--- a/examples/py-flow-dashboard/flow-dash.py
+++ b/examples/py-flow-dashboard/flow-dash.py
@@ -11,6 +11,11 @@ import nDPIsrvd
from nDPIsrvd import nDPIsrvdSocket
import plotly_dash
+FLOW_RISK_SEVERE = 4
+FLOW_RISK_HIGH = 3
+FLOW_RISK_MEDIUM = 2
+FLOW_RISK_LOW = 1
+
def nDPIsrvd_worker_onFlowCleanup(instance, current_flow, global_user_data):
_, shared_flow_dict = global_user_data
@@ -35,9 +40,18 @@ def nDPIsrvd_worker_onFlowCleanup(instance, current_flow, global_user_data):
if shared_flow_dict[flow_id]['is_midstream'] is True:
shared_flow_dict['current-midstream-flows'] -= 1
- if shared_flow_dict[flow_id]['is_risky'] is True:
+ if shared_flow_dict[flow_id]['is_risky'] > 0:
shared_flow_dict['current-risky-flows'] -= 1
+ if shared_flow_dict[flow_id]['is_risky'] == FLOW_RISK_LOW:
+ shared_flow_dict['current-risky-flows-low'] -= 1
+ elif shared_flow_dict[flow_id]['is_risky'] == FLOW_RISK_MEDIUM:
+ shared_flow_dict['current-risky-flows-medium'] -= 1
+ elif shared_flow_dict[flow_id]['is_risky'] == FLOW_RISK_HIGH:
+ shared_flow_dict['current-risky-flows-high'] -= 1
+ elif shared_flow_dict[flow_id]['is_risky'] == FLOW_RISK_SEVERE:
+ shared_flow_dict['current-risky-flows-severe'] -= 1
+
del shared_flow_dict[current_flow.flow_id]
return True
@@ -72,7 +86,7 @@ def nDPIsrvd_worker_onJsonLineRecvd(json_dict, instance, current_flow, global_us
shared_flow_dict[flow_id]['is_guessed'] = False
shared_flow_dict[flow_id]['is_not_detected'] = False
shared_flow_dict[flow_id]['is_midstream'] = False
- shared_flow_dict[flow_id]['is_risky'] = False
+ shared_flow_dict[flow_id]['is_risky'] = 0
shared_flow_dict[flow_id]['total-l4-bytes'] = 0
shared_flow_dict[flow_id]['json'] = mgr.dict()
@@ -93,11 +107,33 @@ def nDPIsrvd_worker_onJsonLineRecvd(json_dict, instance, current_flow, global_us
# XXX: Will make use of that JSON string in Plotly. Soon..
shared_flow_dict[flow_id]['json']['ndpi'] = json_dict['ndpi']
- if 'flow_risk' in json_dict['ndpi']:
- if shared_flow_dict[flow_id]['is_risky'] is False:
- shared_flow_dict['total-risky-flows'] += 1
- shared_flow_dict['current-risky-flows'] += 1
- shared_flow_dict[flow_id]['is_risky'] = True
+ if 'flow_risk' in json_dict['ndpi'] and shared_flow_dict[flow_id]['is_risky'] == 0:
+ shared_flow_dict['total-risky-flows'] += 1
+ shared_flow_dict['current-risky-flows'] += 1
+
+ severity = shared_flow_dict[flow_id]['is_risky']
+ for key in json_dict['ndpi']['flow_risk']:
+ if json_dict['ndpi']['flow_risk'][key]['severity'] == 'Low':
+ severity = max(severity, FLOW_RISK_LOW)
+ elif json_dict['ndpi']['flow_risk'][key]['severity'] == 'Medium':
+ severity = max(severity, FLOW_RISK_MEDIUM)
+ elif json_dict['ndpi']['flow_risk'][key]['severity'] == 'High':
+ severity = max(severity, FLOW_RISK_HIGH)
+ elif json_dict['ndpi']['flow_risk'][key]['severity'] == 'Severe':
+ severity = max(severity, FLOW_RISK_SEVERE)
+ else:
+ raise RuntimeError('Invalid flow risk severity: {}'.format(
+ json_dict['ndpi']['flow_risk'][key]['severity']))
+ shared_flow_dict[flow_id]['is_risky'] = severity
+
+ if shared_flow_dict[flow_id]['is_risky'] == FLOW_RISK_LOW:
+ shared_flow_dict['current-risky-flows-low'] += 1
+ elif shared_flow_dict[flow_id]['is_risky'] == FLOW_RISK_MEDIUM:
+ shared_flow_dict['current-risky-flows-medium'] += 1
+ elif shared_flow_dict[flow_id]['is_risky'] == FLOW_RISK_HIGH:
+ shared_flow_dict['current-risky-flows-high'] += 1
+ elif shared_flow_dict[flow_id]['is_risky'] == FLOW_RISK_SEVERE:
+ shared_flow_dict['current-risky-flows-severe'] += 1
if 'flow_event_name' not in json_dict:
return True
@@ -221,11 +257,16 @@ if __name__ == '__main__':
shared_flow_dict['current-flows'] = 0
shared_flow_dict['current-detected-flows'] = 0
- shared_flow_dict['current-risky-flows'] = 0
shared_flow_dict['current-midstream-flows'] = 0
shared_flow_dict['current-guessed-flows'] = 0
shared_flow_dict['current-not-detected-flows'] = 0
+ shared_flow_dict['current-risky-flows'] = 0
+ shared_flow_dict['current-risky-flows-severe'] = 0
+ shared_flow_dict['current-risky-flows-high'] = 0
+ shared_flow_dict['current-risky-flows-medium'] = 0
+ shared_flow_dict['current-risky-flows-low'] = 0
+
nDPIsrvd_job = multiprocessing.Process(target=nDPIsrvd_worker,
args=(address, shared_flow_dict))
nDPIsrvd_job.start()
diff --git a/examples/py-flow-dashboard/plotly_dash.py b/examples/py-flow-dashboard/plotly_dash.py
index 3577776cf..adc103afb 100644
--- a/examples/py-flow-dashboard/plotly_dash.py
+++ b/examples/py-flow-dashboard/plotly_dash.py
@@ -89,7 +89,7 @@ def build_piechart(labels, values, color_map=None):
COLOR_MAP = {
'piechart-flows': ['rgb(153, 153, 255)', 'rgb(153, 204, 255)', 'rgb(255, 204, 153)', 'rgb(255, 255, 255)'],
'piechart-midstream-flows': ['rgb(255, 255, 153)', 'rgb(153, 153, 255)'],
- 'piechart-risky-flows': ['rgb(255, 153, 153)', 'rgb(153, 153, 255)'],
+ 'piechart-risky-flows': ['rgb(255, 0, 0)', 'rgb(255, 128, 0)', 'rgb(255, 255, 0)', 'rgb(128, 255, 0)', 'rgb(153, 153, 255)'],
'graph-flows': {'Current Active Flows': {'color': 'rgb(153, 153, 255)', 'width': 1},
'Current Risky Flows': {'color': 'rgb(255, 153, 153)', 'width': 3},
'Current Midstream Flows': {'color': 'rgb(255, 255, 153)', 'width': 3},
@@ -150,7 +150,7 @@ def generate_tab_flow():
config={
'displayModeBar': False,
},
- figure=build_piechart(['Risky', 'Not Risky'],
+ figure=build_piechart(['Severy Risk', 'High Risk', 'Medium Risk', 'Low Risk', 'No Risk'],
[0, 0], COLOR_MAP['piechart-risky-flows']),
),
], style={'padding': 10, 'flex': 1}),
@@ -257,8 +257,11 @@ def tab_flow_update_components(n):
shared_flow_dict['current-flows'] -
shared_flow_dict['current-midstream-flows']],
COLOR_MAP['piechart-midstream-flows']),
- build_piechart(['Risky', 'Not Risky'],
- [shared_flow_dict['current-risky-flows'],
+ build_piechart(['Severe', 'High', 'Medium', 'Low', 'No Risk'],
+ [shared_flow_dict['current-risky-flows-severe'],
+ shared_flow_dict['current-risky-flows-high'],
+ shared_flow_dict['current-risky-flows-medium'],
+ shared_flow_dict['current-risky-flows-low'],
shared_flow_dict['current-flows'] -
shared_flow_dict['current-risky-flows']],
COLOR_MAP['piechart-risky-flows'])]
diff --git a/examples/py-flow-info/flow-info.py b/examples/py-flow-info/flow-info.py
index 06bbf83fb..7df843ccb 100755
--- a/examples/py-flow-info/flow-info.py
+++ b/examples/py-flow-info/flow-info.py
@@ -229,18 +229,57 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
instance_and_source = ''
if args.hide_instance_info is False:
- instance_and_source += '[{}]'.format(TermColor.setColorByString(instance.alias))
- instance_and_source += '[{}] '.format(TermColor.setColorByString(instance.source))
+ instance_and_source += '[{}][{}][{:.>2}] '.format(
+ TermColor.setColorByString(instance.alias),
+ TermColor.setColorByString(instance.source),
+ json_dict['thread_id'])
+ else:
+ instance_and_source += ' '
+
+ basic_daemon_event_prefix = ''
+ timestamp = ''
+ if args.print_timestamp is True:
+ if 'thread_ts_msec' in json_dict:
+ timestamp += '[{}]'.format(time.strftime('%H:%M:%S',
+ time.localtime(json_dict['thread_ts_msec'] / 1000)))
+ elif 'global_ts_msec' in json_dict:
+ timestamp += '[{}]'.format(time.strftime('%H:%M:%S',
+ time.localtime(json_dict['global_ts_msec'] / 1000)))
+
+ first_seen = ''
+ if args.print_first_seen is True:
+ basic_daemon_event_prefix += ' ' * 11
+ if 'flow_first_seen' in json_dict:
+ first_seen = '[' + prettifyTimediff(json_dict['flow_first_seen'] / 1000,
+ json_dict['thread_ts_msec'] / 1000) + ']'
+
+ last_seen = ''
+ if args.print_last_seen is True:
+ basic_daemon_event_prefix += ' ' * 11
+ if 'flow_last_seen' in json_dict:
+ last_seen = '[' + prettifyTimediff(json_dict['flow_last_seen'] / 1000,
+ json_dict['thread_ts_msec'] / 1000) + ']'
if 'daemon_event_id' in json_dict:
- print('{} {}: {}'.format(instance_and_source,
- prettifyEvent([TermColor.WARNING, TermColor.BLINK], 16, 'DAEMON-EVENT'),
- json_dict['daemon_event_name']))
+ if json_dict['daemon_event_name'] == 'status':
+ color = [TermColor.WARNING]
+ daemon_msg = '[Processed: {} pkts][Flows][active: {} / {}|skipped: {}|!detected: {}' \
+ '|guessed: {}|detection-updates: {}|updates: {}]'.format(
+ json_dict['packets-processed'],
+ json_dict['current-active-flows'], json_dict['total-active-flows'],
+ json_dict['total-skipped-flows'],
+ json_dict['total-not-detected-flows'], json_dict['total-guessed-flows'],
+ json_dict['total-detection-updates'], json_dict['total-updates'])
+ else:
+ color = [TermColor.WARNING, TermColor.BLINK]
+ daemon_msg = json_dict['daemon_event_name']
+ print('{}{}{} {}: {}'.format(timestamp, basic_daemon_event_prefix, instance_and_source,
+ prettifyEvent(color, 15, 'DAEMON-EVENT'), daemon_msg))
stats.printStatus()
return True
if 'basic_event_id' in json_dict:
- print('{} {}: {}'.format(instance_and_source,
- prettifyEvent([TermColor.FAIL, TermColor.BLINK], 16, 'BASIC-EVENT'),
+ print('{}{}{} {}: {}'.format(timestamp, basic_daemon_event_prefix, instance_and_source,
+ prettifyEvent([TermColor.FAIL, TermColor.BLINK], 15, 'BASIC-EVENT'),
json_dict['basic_event_name']))
stats.printStatus()
return True
@@ -252,28 +291,6 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
stats.printStatus()
return True
- timestamp = ''
- if args.print_timestamp is True and 'ts_msec' in json_dict:
- timestamp += '[{}]'.format(time.strftime('%H:%M:%S',
- time.localtime(json_dict['ts_msec'] / 1000)))
-
- first_seen = ''
- if args.print_first_seen is True and 'flow_first_seen' in json_dict:
- first_seen = '[' + prettifyTimediff(json_dict['flow_first_seen'] / 1000,
- json_dict['ts_msec'] / 1000) + ']'
-
- last_seen = ''
- if args.print_last_seen is True and 'flow_last_seen' in json_dict:
- last_seen = '[' + prettifyTimediff(json_dict['flow_last_seen'] / 1000,
- json_dict['ts_msec'] / 1000) + ']'
-
- if len(last_seen) > 0:
- last_seen += ' '
- elif len(timestamp) > 0:
- timestamp += ' '
- elif len(first_seen) > 0:
- first_seen += ' '
-
ndpi_proto_categ_breed = ''
ndpi_frisk = ''
@@ -288,14 +305,33 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
ndpi_proto_categ_breed += '[' + str(json_dict['ndpi']['breed']) + ']'
if 'flow_risk' in json_dict['ndpi']:
+ severity = 0
cnt = 0
+
for key in json_dict['ndpi']['flow_risk']:
- ndpi_frisk += str(json_dict['ndpi']['flow_risk'][key]) + ', '
+ ndpi_frisk += str(json_dict['ndpi']['flow_risk'][key]['risk']) + ', '
+ if json_dict['ndpi']['flow_risk'][key]['severity'] == 'Low':
+ severity = max(severity, 1)
+ elif json_dict['ndpi']['flow_risk'][key]['severity'] == 'Medium':
+ severity = max(severity, 2)
+ elif json_dict['ndpi']['flow_risk'][key]['severity'] == 'High':
+ severity = max(severity, 3)
+ elif json_dict['ndpi']['flow_risk'][key]['severity'] == 'Severe':
+ severity = max(severity, 4)
cnt += 1
- ndpi_frisk = '{}: {}'.format(
- TermColor.WARNING + TermColor.BOLD + 'RISK' + TermColor.END if cnt < 2
- else TermColor.FAIL + TermColor.BOLD + TermColor.BLINK + 'RISK' + TermColor.END,
- ndpi_frisk[:-2])
+
+ if severity == 1:
+ color = TermColor.WARNING + TermColor.BOLD
+ elif severity == 2:
+ color = TermColor.WARNING + TermColor.BOLD + TermColor.BLINK
+ elif severity == 3:
+ color = TermColor.FAIL + TermColor.BOLD
+ elif severity == 4:
+ color = TermColor.FAIL + TermColor.BOLD + TermColor.BLINK
+ else:
+ color = ''
+
+ ndpi_frisk = '{}{}{}: {}'.format(color, 'RISK', TermColor.END, ndpi_frisk[:-2])
line_suffix = ''
flow_event_name = ''
diff --git a/examples/py-semantic-validation/py-semantic-validation.py b/examples/py-semantic-validation/py-semantic-validation.py
index 3ca90edcf..bce0355de 100755
--- a/examples/py-semantic-validation/py-semantic-validation.py
+++ b/examples/py-semantic-validation/py-semantic-validation.py
@@ -21,7 +21,7 @@ class Stats:
self.nsock = nDPIsrvd_sock
def resetEventCounter(self):
- keys = ['init','reconnect','shutdown', \
+ keys = ['init','reconnect','shutdown','status', \
'new','end','idle','update',
'guessed','detected','detection-update','not-detected', \
'packet', 'packet-flow']
@@ -53,7 +53,7 @@ class Stats:
return True
def getEventCounterStr(self):
- keys = [ [ 'init','reconnect','shutdown' ], \
+ keys = [ [ 'init','reconnect','shutdown','status' ], \
[ 'new','end','idle','update' ], \
[ 'guessed','detected','detection-update','not-detected' ], \
[ 'packet', 'packet-flow' ] ]
@@ -76,8 +76,6 @@ class SemanticValidationException(Exception):
return 'Flow ID {}: {}'.format(self.current_flow.flow_id, self.text)
def onFlowCleanup(instance, current_flow, global_user_data):
- _, enable_timeout_check, _ = global_user_data
-
if type(instance) is not nDPIsrvd.Instance:
raise SemanticValidationException(current_flow,
'instance is not of type nDPIsrvd.Instance: ' \
@@ -99,37 +97,32 @@ def onFlowCleanup(instance, current_flow, global_user_data):
raise SemanticValidationException(current_flow,
'Unexpected flow cleanup reason: CLEANUP_REASON_FLOW_TIMEOUT')
- if enable_timeout_check is True:
- try:
- l4_proto = current_flow.l4_proto
- except AttributeError:
- l4_proto = 'n/a'
-
- invalid_flows = stats.nsock.verify()
- if len(invalid_flows) > 0:
- invalid_flows_str = ''
- for flow_id in invalid_flows:
- flow = instance.flows[flow_id]
- try:
- l4_proto = flow.l4_proto
- except AttributeError:
- l4_proto = 'n/a'
- invalid_flows_str += '{} proto[{},{}] ts[{} + {} < {}] diff[{}], '.format(flow_id, l4_proto, flow.flow_idle_time,
- flow.flow_last_seen, flow.flow_idle_time,
- instance.most_recent_flow_time,
- instance.most_recent_flow_time -
- (flow.flow_last_seen + flow.flow_idle_time))
-
- raise SemanticValidationException(None, 'Flow Manager verification failed for: {}'.format(invalid_flows_str[:-2]))
+ try:
+ l4_proto = current_flow.l4_proto
+ except AttributeError:
+ l4_proto = 'n/a'
- return True
+ invalid_flows = stats.nsock.verify()
+ if len(invalid_flows) > 0:
+ invalid_flows_str = ''
+ for flow_id in invalid_flows:
+ flow = instance.flows[flow_id]
+ try:
+ l4_proto = flow.l4_proto
+ except AttributeError:
+ l4_proto = 'n/a'
+ invalid_flows_str += '{} proto[{},{}] ts[{} + {} < {}] diff[{}], '.format(flow_id, l4_proto, flow.flow_idle_time,
+ flow.flow_last_seen, flow.flow_idle_time,
+ instance.most_recent_flow_time,
+ instance.most_recent_flow_time -
+ (flow.flow_last_seen + flow.flow_idle_time))
-class ThreadData(object):
- lowest_possible_flow_id = 0
- lowest_possible_packet_id = 0
+ raise SemanticValidationException(None, 'Flow Manager verification failed for: {}'.format(invalid_flows_str[:-2]))
+
+ return True
def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
- _, _, stats = global_user_data
+ _, stats = global_user_data
stats.incrementEventCounter(json_dict)
if type(instance) is not nDPIsrvd.Instance:
@@ -149,15 +142,7 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
'stats is not of type Stats: ' \
'{}'.format(type(stats)))
- try:
- thread_data_dict = instance.thread_data
- except AttributeError:
- thread_data_dict = instance.thread_data = dict()
-
- if json_dict['thread_id'] in thread_data_dict:
- td = thread_data_dict[json_dict['thread_id']]
- else:
- td = thread_data_dict[json_dict['thread_id']] = ThreadData()
+ td = instance.getThreadDataFromJSON(json_dict)
for event_name in ['basic_event_name', 'daemon_event_name',
'packet_event_name', 'flow_event_name']:
@@ -165,8 +150,12 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
raise SemanticValidationException(current_flow,
'Received an invalid event for {}'.format(event_name))
- lowest_possible_flow_id = td.lowest_possible_flow_id
- lowest_possible_packet_id = td.lowest_possible_packet_id
+ if td is not None:
+ lowest_possible_flow_id = getattr(td, 'lowest_possible_flow_id', 0)
+ lowest_possible_packet_id = getattr(td, 'lowest_possible_packet_id', 0)
+ else:
+ lowest_possible_flow_id = 0
+ lowest_possible_packet_id = 0
if current_flow is not None:
@@ -203,8 +192,8 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
'both required for timeout handling:' \
'flow_last_seen, flow_idle_time')
- if 'ts_msec' in json_dict:
- current_flow.ts_msec = int(json_dict['ts_msec'])
+ if 'thread_ts_msec' in json_dict:
+ current_flow.thread_ts_msec = int(json_dict['thread_ts_msec'])
if 'flow_packet_id' in json_dict:
try:
@@ -233,7 +222,8 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
'got {}'.format(json_dict['thread_id'],
lowest_possible_packet_id,
json_dict['packet_id']))
- td.lowest_possible_packet_id = lowest_possible_packet_id
+ if td is not None:
+ td.lowest_possible_packet_id = lowest_possible_packet_id
if 'flow_id' in json_dict:
if current_flow.flow_id != json_dict['flow_id']:
@@ -275,13 +265,13 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
pass
try:
- if json_dict['flow_first_seen'] > current_flow.ts_msec or \
- json_dict['flow_last_seen'] > current_flow.ts_msec or \
+ if json_dict['flow_first_seen'] > current_flow.thread_ts_msec or \
+ json_dict['flow_last_seen'] > current_flow.thread_ts_msec or \
json_dict['flow_first_seen'] > json_dict['flow_last_seen']:
raise SemanticValidationException(current_flow,
'Last packet timestamp is invalid: ' \
'first_seen({}) <= {} >= last_seen({})'.format(json_dict['flow_first_seen'],
- current_flow.ts_msec,
+ current_flow.thread_ts_msec,
json_dict['flow_last_seen']))
except AttributeError:
if json_dict['flow_event_name'] == 'new':
@@ -303,7 +293,7 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
pass
current_flow.flow_new_seen = True
current_flow.flow_packet_id = 0
- if lowest_possible_flow_id == 0:
+ if lowest_possible_flow_id == 0 and td is not None:
td.lowest_possible_flow_id = current_flow.flow_id
elif json_dict['flow_event_name'] == 'detected' or \
json_dict['flow_event_name'] == 'not-detected':
@@ -337,8 +327,6 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
if __name__ == '__main__':
argparser = nDPIsrvd.defaultArgumentParser()
argparser.add_argument('--strict', action='store_true', default=False, help='Require and validate a full nDPId application lifecycle.')
- argparser.add_argument('--enable-timeout-check', action='store_true', default=False,
- help='Enable additional flow timeout validation. See README.md for more information')
args = argparser.parse_args()
address = nDPIsrvd.validateAddress(args)
@@ -349,7 +337,7 @@ if __name__ == '__main__':
nsock.connect(address)
stats = Stats(nsock)
try:
- nsock.loop(onJsonLineRecvd, onFlowCleanup, (args.strict, args.enable_timeout_check, stats))
+ nsock.loop(onJsonLineRecvd, onFlowCleanup, (args.strict, stats))
except nDPIsrvd.SocketConnectionBroken as err:
sys.stderr.write('\n{}\n'.format(err))
except KeyboardInterrupt: