summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2022-09-30 18:42:10 +0200
committerToni Uhlig <matzeton@googlemail.com>2022-09-30 19:28:49 +0200
commit14f6b87551c1d03837f25755abbc8eb71d958e3e (patch)
tree6b7f1a3e481f61e726486c8d255b14e0d9e83f12 /examples
parent74f71643da536c6798d077dc1d9b13d56a9afc5d (diff)
Added nDPIsrvd-analysed to generate CSV files from analyse events.
* nDPIsrvd.h: iterate over JSON arrays * nDPId: calculate l3 payload packet entropies for analysis Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'examples')
-rw-r--r--examples/c-analysed/c-analysed.c362
-rwxr-xr-xexamples/py-flow-info/flow-info.py18
2 files changed, 359 insertions, 21 deletions
diff --git a/examples/c-analysed/c-analysed.c b/examples/c-analysed/c-analysed.c
index c0af7a8d9..c12ccf621 100644
--- a/examples/c-analysed/c-analysed.c
+++ b/examples/c-analysed/c-analysed.c
@@ -1,13 +1,25 @@
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
+#include <syslog.h>
#include <unistd.h>
#include "nDPIsrvd.h"
+#include "utils.h"
+
+#define MIN(a, b) (a > b ? b : a)
+#define BUFFER_REMAINING(siz) (NETWORK_BUFFER_MAX_SIZE - siz)
static int main_thread_shutdown = 0;
static struct nDPIsrvd_socket * sock = NULL;
+static char * pidfile = NULL;
+static char * serv_optarg = NULL;
+static char * user = NULL;
+static char * group = NULL;
+static char * csv_outfile = NULL;
+static FILE * csv_fp = NULL;
+
#ifdef ENABLE_MEMORY_PROFILING
void nDPIsrvd_memprof_log_alloc(size_t alloc_size)
{
@@ -129,12 +141,119 @@ static void sighandler(int signum)
}
}
+static void csv_buf_add(char csv_buf[NETWORK_BUFFER_MAX_SIZE + 1],
+ size_t * const csv_buf_used,
+ char const * const str,
+ size_t siz_len)
+{
+ size_t len;
+
+ if (siz_len > 0 && str != NULL)
+ {
+ len = MIN(BUFFER_REMAINING(*csv_buf_used), siz_len);
+ if (len == 0)
+ {
+ return;
+ }
+ strncat(csv_buf, str, len);
+ }
+ else
+ {
+ len = 0;
+ }
+
+ *csv_buf_used += len;
+ if (BUFFER_REMAINING(*csv_buf_used) > 0)
+ {
+ csv_buf[*csv_buf_used] = ',';
+ (*csv_buf_used)++;
+ }
+ csv_buf[*csv_buf_used] = '\0';
+}
+
+static int json_value_to_csv(struct nDPIsrvd_socket * const sock,
+ char csv_buf[NETWORK_BUFFER_MAX_SIZE + 1],
+ size_t * const csv_buf_used,
+ char const * const json_key,
+ ...)
+{
+ va_list ap;
+ nDPIsrvd_hashkey key;
+ struct nDPIsrvd_json_token const * token;
+ size_t val_length = 0;
+ char const * val;
+ int ret = 0;
+
+ va_start(ap, json_key);
+ key = nDPIsrvd_vbuild_jsmn_key(json_key, ap);
+ va_end(ap);
+
+ token = nDPIsrvd_find_token(sock, key);
+ if (token == NULL)
+ {
+ ret++;
+ }
+
+ val = TOKEN_GET_VALUE(sock, token, &val_length);
+ if (val == NULL)
+ {
+ ret++;
+ }
+
+ csv_buf_add(csv_buf, csv_buf_used, val, val_length);
+
+ return ret;
+}
+
+static int json_array_to_csv(struct nDPIsrvd_socket * const sock,
+ char csv_buf[NETWORK_BUFFER_MAX_SIZE + 1],
+ size_t * const csv_buf_used,
+ char const * const json_key,
+ ...)
+{
+ va_list ap;
+ nDPIsrvd_hashkey key;
+ struct nDPIsrvd_json_token const * token;
+ int ret = 0;
+
+ va_start(ap, json_key);
+ key = nDPIsrvd_vbuild_jsmn_key(json_key, ap);
+ va_end(ap);
+
+ token = nDPIsrvd_find_token(sock, key);
+ if (token == NULL)
+ {
+ ret++;
+ csv_buf_add(csv_buf, csv_buf_used, NULL, 0);
+ }
+
+ {
+ struct nDPIsrvd_json_token next = {};
+
+ csv_buf_add(csv_buf, csv_buf_used, "\"", 1);
+ csv_buf[--(*csv_buf_used)] = '\0';
+ while (nDPIsrvd_token_iterate(sock, token, &next) == 0)
+ {
+ size_t val_length = 0;
+ char const * const val = TOKEN_GET_VALUE(sock, &next, &val_length);
+
+ csv_buf_add(csv_buf, csv_buf_used, val, val_length);
+ }
+ csv_buf[--(*csv_buf_used)] = '\0';
+ csv_buf_add(csv_buf, csv_buf_used, "\"", 1);
+ }
+
+ return ret;
+}
+
static enum nDPIsrvd_callback_return simple_json_callback(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_instance * const instance,
struct nDPIsrvd_thread_data * const thread_data,
struct nDPIsrvd_flow * const flow)
{
- (void)sock;
+ char csv_buf[NETWORK_BUFFER_MAX_SIZE + 1];
+ size_t csv_buf_used = 0;
+
(void)instance;
(void)thread_data;
@@ -154,36 +273,226 @@ static enum nDPIsrvd_callback_return simple_json_callback(struct nDPIsrvd_socket
return CALLBACK_ERROR;
}
- struct nDPIsrvd_json_token const * const iat = TOKEN_GET_SZ(sock, "data_analysis", "iat");
- if (iat == NULL)
+ csv_buf[0] = '\0';
+
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_datalink", NULL);
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "l3_proto", NULL);
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "src_ip", NULL);
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "dst_ip", NULL);
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "l4_proto", NULL);
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "src_port", NULL);
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "dst_port", NULL);
+
+ if (json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_state", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_src_packets_processed", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_dst_packets_processed", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_first_seen", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_src_last_pkt_time", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_dst_last_pkt_time", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_src_min_l4_payload_len", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_dst_min_l4_payload_len", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_src_max_l4_payload_len", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_dst_max_l4_payload_len", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_src_tot_l4_payload_len", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_dst_tot_l4_payload_len", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "midstream", NULL) != 0)
+ {
+ return CALLBACK_ERROR;
+ }
+
+ if (json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "iat", "min", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "iat", "avg", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "iat", "max", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "iat", "stddev", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "iat", "var", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "iat", "ent", NULL) != 0)
+ {
+ return CALLBACK_ERROR;
+ }
+
+ if (json_array_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "iat", "data", NULL) != 0)
+ {
+ return CALLBACK_ERROR;
+ }
+
+ if (json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "pktlen", "min", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "pktlen", "avg", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "pktlen", "max", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "pktlen", "stddev", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "pktlen", "var", NULL) != 0 ||
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "pktlen", "ent", NULL) != 0)
+ {
+ return CALLBACK_ERROR;
+ }
+
+ if (json_array_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "pktlen", "data", NULL) != 0)
+ {
+ return CALLBACK_ERROR;
+ }
+
+ if (json_array_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "bins", "c_to_s", NULL) != 0)
+ {
+ return CALLBACK_ERROR;
+ }
+
+ if (json_array_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "bins", "s_to_c", NULL) != 0)
+ {
+ return CALLBACK_ERROR;
+ }
+
+ if (json_array_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "directions", NULL) != 0)
{
return CALLBACK_ERROR;
}
+ if (json_array_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "entropies", NULL) != 0)
{
+ return CALLBACK_ERROR;
+ }
+
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "ndpi", "proto", NULL);
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "ndpi", "proto_id", NULL);
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "ndpi", "encrypted", NULL);
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "ndpi", "breed", NULL);
+ json_value_to_csv(sock, csv_buf, &csv_buf_used, "ndpi", "category", NULL);
+ {
+ struct nDPIsrvd_json_token const * const token = TOKEN_GET_SZ(sock, "ndpi", "confidence");
struct nDPIsrvd_json_token const * current = NULL;
int next_child_index = -1;
- while ((current = nDPIsrvd_get_next_token(sock, iat, &next_child_index)) != NULL)
+ if (token == NULL)
{
- size_t key_length = 0;
- char const * const key = TOKEN_GET_KEY(sock, current, &key_length);
+ csv_buf_add(csv_buf, &csv_buf_used, NULL, 0);
+ csv_buf_add(csv_buf, &csv_buf_used, NULL, 0);
+ }
+ else
+ {
+ while ((current = nDPIsrvd_get_next_token(sock, token, &next_child_index)) != NULL)
+ {
+ size_t key_length = 0, value_length = 0;
+ char const * const key = TOKEN_GET_KEY(sock, current, &key_length);
+ char const * const value = TOKEN_GET_VALUE(sock, current, &value_length);
- printf("__%.*s__\n", (int)key_length, key);
- next_child_index++;
+ csv_buf_add(csv_buf, &csv_buf_used, key, key_length);
+ csv_buf_add(csv_buf, &csv_buf_used, value, value_length);
+ }
}
-printf("===\n");
}
+ if (csv_buf_used > 0 && csv_buf[csv_buf_used - 1] == ',')
+ {
+ csv_buf[--csv_buf_used] = '\0';
+ }
+
+ fprintf(csv_fp, "%.*s\n", (int)csv_buf_used, csv_buf);
+
return CALLBACK_OK;
}
-static void simple_flow_cleanup_callback(struct nDPIsrvd_socket * const sock,
- struct nDPIsrvd_instance * const instance,
- struct nDPIsrvd_thread_data * const thread_data,
- struct nDPIsrvd_flow * const flow,
- enum nDPIsrvd_cleanup_reason reason)
+static void print_usage(char const * const arg0)
{
+ static char const usage[] =
+ "Usage: %s "
+ "[-d] [-p pidfile] [-s host]\n"
+ "\t \t[-u user] [-g group] [-o csv-outfile]\n\n"
+ "\t-d\tForking into background after initialization.\n"
+ "\t-p\tWrite the daemon PID to the given file path.\n"
+ "\t-s\tDestination where nDPIsrvd is listening on.\n"
+ "\t \tCan be either a path to UNIX socket or an IPv4/TCP-Port IPv6/TCP-Port tuple.\n"
+ "\t-u\tChange user.\n"
+ "\t-g\tChange group.\n"
+ "\t-o\tSpecify the CSV output file for analysis results\n\n";
+
+ fprintf(stderr, usage, arg0);
+}
+
+static int parse_options(int argc, char ** argv)
+{
+ int opt;
+
+ while ((opt = getopt(argc, argv, "hdp:s:u:g:o:")) != -1)
+ {
+ switch (opt)
+ {
+ case 'd':
+ daemonize_enable();
+ break;
+ case 'p':
+ free(pidfile);
+ pidfile = strdup(optarg);
+ break;
+ case 's':
+ free(serv_optarg);
+ serv_optarg = strdup(optarg);
+ break;
+ case 'u':
+ free(user);
+ user = strdup(optarg);
+ break;
+ case 'g':
+ free(group);
+ group = strdup(optarg);
+ break;
+ case 'o':
+ free(csv_outfile);
+ csv_outfile = strdup(optarg);
+ break;
+ default:
+ print_usage(argv[0]);
+ return 1;
+ }
+ }
+
+ if (csv_outfile == NULL)
+ {
+ fprintf(stderr, "%s: Missing CSV output file (`-o')\n", argv[0]);
+ return 1;
+ }
+
+ opt = 0;
+ if (access(csv_outfile, F_OK) != 0 && errno == ENOENT)
+ {
+ opt = 1;
+ }
+
+ csv_fp = fopen(csv_outfile, "a+");
+ if (csv_fp == NULL)
+ {
+ fprintf(stderr, "%s: Could not open file `%s' for appending\n", argv[0], csv_outfile);
+ return 1;
+ }
+
+ if (opt != 0)
+ {
+ fprintf(csv_fp,
+ "flow_datalink,l3_proto,src_ip,dst_ip,l4_proto,src_port,dst_port,flow_state,flow_src_packets_processed,"
+ "flow_dst_packets_processed,flow_first_seen,flow_src_last_pkt_time,flow_dst_last_pkt_time,flow_src_min_"
+ "l4_payload_len,flow_dst_min_l4_payload_len,flow_src_max_l4_payload_len,flow_dst_max_l4_payload_len,"
+ "flow_src_tot_l4_payload_len,flow_dst_tot_l4_payload_len,midstream,iat_min,iat_avg,iat_max,iat_stddev,"
+ "iat_var,iat_ent,iat_data,pktlen_min,pktlen_avg,pktlen_max,pktlen_stddev,pktlen_var,pktlen_ent,pktlen_"
+ "data,bins_c_to_s,bins_s_to_c,directions,entropies,proto,proto_id,encrypted,breed,category,"
+ "confidence_id,confidence\n");
+ }
+
+ if (serv_optarg == NULL)
+ {
+ serv_optarg = strdup(DISTRIBUTOR_UNIX_SOCKET);
+ }
+
+ if (nDPIsrvd_setup_address(&sock->address, serv_optarg) != 0)
+ {
+ fprintf(stderr, "%s: Could not parse address `%s'\n", argv[0], serv_optarg);
+ return 1;
+ }
+
+ if (optind < argc)
+ {
+ fprintf(stderr, "Unexpected argument after options\n\n");
+ print_usage(argv[0]);
+ return 1;
+ }
+
+ return 0;
}
int main(int argc, char ** argv)
@@ -193,23 +502,38 @@ int main(int argc, char ** argv)
signal(SIGTERM, sighandler);
signal(SIGPIPE, sighandler);
- sock = nDPIsrvd_socket_init(0, 0, 0, 0, simple_json_callback, NULL, simple_flow_cleanup_callback);
+ sock = nDPIsrvd_socket_init(0, 0, 0, 0, simple_json_callback, NULL, NULL);
if (sock == NULL)
{
return 1;
}
- if (nDPIsrvd_setup_address(&sock->address, (argc > 1 ? argv[1] : "127.0.0.1:7000")) != 0)
+ if (parse_options(argc, argv) != 0)
{
return 1;
}
+ printf("Recv buffer size: %u\n", NETWORK_BUFFER_MAX_SIZE);
+ printf("Connecting to `%s'..\n", serv_optarg);
+
if (nDPIsrvd_connect(sock) != CONNECT_OK)
{
+ fprintf(stderr, "%s: nDPIsrvd socket connect to %s failed!\n", argv[0], serv_optarg);
nDPIsrvd_socket_free(&sock);
return 1;
}
+ signal(SIGUSR1, sighandler);
+ signal(SIGINT, sighandler);
+ signal(SIGTERM, sighandler);
+ signal(SIGPIPE, sighandler);
+
+ if (daemonize_with_pidfile(pidfile) != 0)
+ {
+ return 1;
+ }
+ openlog("nDPIsrvd-analyzed", LOG_CONS, LOG_DAEMON);
+
if (nDPIsrvd_set_read_timeout(sock, 180, 0) != 0)
{
return 1;
@@ -247,5 +571,9 @@ int main(int argc, char ** argv)
printf("Parse read %s\n", nDPIsrvd_enum_to_string(read_ret));
}
- return 1;
+ nDPIsrvd_socket_free(&sock);
+ daemonize_shutdown(pidfile);
+ closelog();
+
+ return read_ret;
}
diff --git a/examples/py-flow-info/flow-info.py b/examples/py-flow-info/flow-info.py
index 3c58858ed..0fba6fcaa 100755
--- a/examples/py-flow-info/flow-info.py
+++ b/examples/py-flow-info/flow-info.py
@@ -363,18 +363,18 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
flow_event_name += '{}{:>16}{}'.format(TermColor.WARNING,
json_dict['flow_event_name'], TermColor.END)
if args.print_analyse_results is True:
- next_lines = [' {:>9}|{:>9}|{:>9}|{:>9}|{:>9}|{:>9}'.format(
+ next_lines = [' {:>9}|{:>9}|{:>9}|{:>9}|{:>15}|{:>8}'.format(
'min', 'max', 'avg', 'stddev', 'variance', 'entropy')]
- next_lines += ['[IAT.........: {:>9.3f}|{:>9.3f}|{:>9.3f}|{:>9.3f}|{:>9.3f}|{:>9.3f}]'.format(
+ next_lines += ['[IAT.........: {:>9.3f}|{:>9.3f}|{:>9.3f}|{:>9.3f}|{:>15.3f}|{:>8.3f}]'.format(
nDPIsrvd.toSeconds(json_dict['data_analysis']['iat']['min']),
nDPIsrvd.toSeconds(json_dict['data_analysis']['iat']['max']),
nDPIsrvd.toSeconds(json_dict['data_analysis']['iat']['avg']),
nDPIsrvd.toSeconds(json_dict['data_analysis']['iat']['stddev']),
nDPIsrvd.toSeconds(json_dict['data_analysis']['iat']['var']),
- nDPIsrvd.toSeconds(json_dict['data_analysis']['iat']['ent'])
+ json_dict['data_analysis']['iat']['ent']
)]
next_lines += ['']
- next_lines[-1] += '[PKTLEN......: {:>9.3f}|{:>9.3f}|{:>9.3f}|{:>9.3f}|{:>9.3f}|{:>9.3f}]'.format(
+ next_lines[-1] += '[PKTLEN......: {:>9.3f}|{:>9.3f}|{:>9.3f}|{:>9.3f}|{:>15.3f}|{:>8.3f}]'.format(
json_dict['data_analysis']['pktlen']['min'],
json_dict['data_analysis']['pktlen']['max'],
json_dict['data_analysis']['pktlen']['avg'],
@@ -396,6 +396,12 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
next_lines[-1] += '[IATS(ms)....: {}]'.format(iats)
next_lines += ['']
next_lines[-1] += '[PKTLENS.....: {}]'.format(','.join([str(n) for n in json_dict['data_analysis']['pktlen']['data']]))
+ next_lines += ['']
+ ents = ''
+ for n in json_dict['data_analysis']['entropies']:
+ ents += '{:.1f},'.format(n)
+ ents = ents[:-1]
+ next_lines[-1] += '[ENTROPIES...: {}]'.format(ents)
else:
if json_dict['flow_event_name'] == 'new':
line_suffix = ''
@@ -416,6 +422,9 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
line_suffix += ']'
flow_event_name += '{}{:>16}{}'.format(flow_active_color, json_dict['flow_event_name'], TermColor.END)
+ if args.print_hostname is True and 'ndpi' in json_dict and 'hostname' in json_dict['ndpi']:
+ line_suffix += '[{}]'.format(json_dict['ndpi']['hostname'])
+
if json_dict['l3_proto'] == 'ip4':
print('{}{}{}{}{}: [{:.>6}] [{}][{:.>5}] [{:.>15}]{} -> [{:.>15}]{} {}{}' \
''.format(timestamp, first_seen, last_seen, instance_and_source, flow_event_name,
@@ -472,6 +481,7 @@ if __name__ == '__main__':
argparser.add_argument('--analyse', action='store_true', default=False, help='Print only analyse flow events.')
argparser.add_argument('--detection', action='store_true', default=False, help='Print only detected/detection-update flow events.')
argparser.add_argument('--ipwhois', action='store_true', default=False, help='Use Python-IPWhois to print additional location information.')
+ argparser.add_argument('--print-hostname', action='store_true', default=False, help='Print detected hostnames if available.')
argparser.add_argument('--print-analyse-results', action='store_true', default=False,
help='Print detailed results of analyse events.')
args = argparser.parse_args()