diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2022-09-30 18:42:10 +0200 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2022-09-30 19:28:49 +0200 |
commit | 14f6b87551c1d03837f25755abbc8eb71d958e3e (patch) | |
tree | 6b7f1a3e481f61e726486c8d255b14e0d9e83f12 /examples | |
parent | 74f71643da536c6798d077dc1d9b13d56a9afc5d (diff) |
Added nDPIsrvd-analysed to generate CSV files from analyse events.
* nDPIsrvd.h: iterate over JSON arrays
* nDPId: calculate l3 payload packet entropies for analysis
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'examples')
-rw-r--r-- | examples/c-analysed/c-analysed.c | 362 | ||||
-rwxr-xr-x | examples/py-flow-info/flow-info.py | 18 |
2 files changed, 359 insertions, 21 deletions
diff --git a/examples/c-analysed/c-analysed.c b/examples/c-analysed/c-analysed.c index c0af7a8d9..c12ccf621 100644 --- a/examples/c-analysed/c-analysed.c +++ b/examples/c-analysed/c-analysed.c @@ -1,13 +1,25 @@ #include <signal.h> #include <stdio.h> #include <stdlib.h> +#include <syslog.h> #include <unistd.h> #include "nDPIsrvd.h" +#include "utils.h" + +#define MIN(a, b) (a > b ? b : a) +#define BUFFER_REMAINING(siz) (NETWORK_BUFFER_MAX_SIZE - siz) static int main_thread_shutdown = 0; static struct nDPIsrvd_socket * sock = NULL; +static char * pidfile = NULL; +static char * serv_optarg = NULL; +static char * user = NULL; +static char * group = NULL; +static char * csv_outfile = NULL; +static FILE * csv_fp = NULL; + #ifdef ENABLE_MEMORY_PROFILING void nDPIsrvd_memprof_log_alloc(size_t alloc_size) { @@ -129,12 +141,119 @@ static void sighandler(int signum) } } +static void csv_buf_add(char csv_buf[NETWORK_BUFFER_MAX_SIZE + 1], + size_t * const csv_buf_used, + char const * const str, + size_t siz_len) +{ + size_t len; + + if (siz_len > 0 && str != NULL) + { + len = MIN(BUFFER_REMAINING(*csv_buf_used), siz_len); + if (len == 0) + { + return; + } + strncat(csv_buf, str, len); + } + else + { + len = 0; + } + + *csv_buf_used += len; + if (BUFFER_REMAINING(*csv_buf_used) > 0) + { + csv_buf[*csv_buf_used] = ','; + (*csv_buf_used)++; + } + csv_buf[*csv_buf_used] = '\0'; +} + +static int json_value_to_csv(struct nDPIsrvd_socket * const sock, + char csv_buf[NETWORK_BUFFER_MAX_SIZE + 1], + size_t * const csv_buf_used, + char const * const json_key, + ...) +{ + va_list ap; + nDPIsrvd_hashkey key; + struct nDPIsrvd_json_token const * token; + size_t val_length = 0; + char const * val; + int ret = 0; + + va_start(ap, json_key); + key = nDPIsrvd_vbuild_jsmn_key(json_key, ap); + va_end(ap); + + token = nDPIsrvd_find_token(sock, key); + if (token == NULL) + { + ret++; + } + + val = TOKEN_GET_VALUE(sock, token, &val_length); + if (val == NULL) + { + ret++; + } + + csv_buf_add(csv_buf, csv_buf_used, val, val_length); + + return ret; +} + +static int json_array_to_csv(struct nDPIsrvd_socket * const sock, + char csv_buf[NETWORK_BUFFER_MAX_SIZE + 1], + size_t * const csv_buf_used, + char const * const json_key, + ...) +{ + va_list ap; + nDPIsrvd_hashkey key; + struct nDPIsrvd_json_token const * token; + int ret = 0; + + va_start(ap, json_key); + key = nDPIsrvd_vbuild_jsmn_key(json_key, ap); + va_end(ap); + + token = nDPIsrvd_find_token(sock, key); + if (token == NULL) + { + ret++; + csv_buf_add(csv_buf, csv_buf_used, NULL, 0); + } + + { + struct nDPIsrvd_json_token next = {}; + + csv_buf_add(csv_buf, csv_buf_used, "\"", 1); + csv_buf[--(*csv_buf_used)] = '\0'; + while (nDPIsrvd_token_iterate(sock, token, &next) == 0) + { + size_t val_length = 0; + char const * const val = TOKEN_GET_VALUE(sock, &next, &val_length); + + csv_buf_add(csv_buf, csv_buf_used, val, val_length); + } + csv_buf[--(*csv_buf_used)] = '\0'; + csv_buf_add(csv_buf, csv_buf_used, "\"", 1); + } + + return ret; +} + static enum nDPIsrvd_callback_return simple_json_callback(struct nDPIsrvd_socket * const sock, struct nDPIsrvd_instance * const instance, struct nDPIsrvd_thread_data * const thread_data, struct nDPIsrvd_flow * const flow) { - (void)sock; + char csv_buf[NETWORK_BUFFER_MAX_SIZE + 1]; + size_t csv_buf_used = 0; + (void)instance; (void)thread_data; @@ -154,36 +273,226 @@ static enum nDPIsrvd_callback_return simple_json_callback(struct nDPIsrvd_socket return CALLBACK_ERROR; } - struct nDPIsrvd_json_token const * const iat = TOKEN_GET_SZ(sock, "data_analysis", "iat"); - if (iat == NULL) + csv_buf[0] = '\0'; + + json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_datalink", NULL); + json_value_to_csv(sock, csv_buf, &csv_buf_used, "l3_proto", NULL); + json_value_to_csv(sock, csv_buf, &csv_buf_used, "src_ip", NULL); + json_value_to_csv(sock, csv_buf, &csv_buf_used, "dst_ip", NULL); + json_value_to_csv(sock, csv_buf, &csv_buf_used, "l4_proto", NULL); + json_value_to_csv(sock, csv_buf, &csv_buf_used, "src_port", NULL); + json_value_to_csv(sock, csv_buf, &csv_buf_used, "dst_port", NULL); + + if (json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_state", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_src_packets_processed", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_dst_packets_processed", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_first_seen", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_src_last_pkt_time", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_dst_last_pkt_time", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_src_min_l4_payload_len", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_dst_min_l4_payload_len", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_src_max_l4_payload_len", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_dst_max_l4_payload_len", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_src_tot_l4_payload_len", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "flow_dst_tot_l4_payload_len", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "midstream", NULL) != 0) + { + return CALLBACK_ERROR; + } + + if (json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "iat", "min", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "iat", "avg", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "iat", "max", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "iat", "stddev", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "iat", "var", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "iat", "ent", NULL) != 0) + { + return CALLBACK_ERROR; + } + + if (json_array_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "iat", "data", NULL) != 0) + { + return CALLBACK_ERROR; + } + + if (json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "pktlen", "min", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "pktlen", "avg", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "pktlen", "max", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "pktlen", "stddev", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "pktlen", "var", NULL) != 0 || + json_value_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "pktlen", "ent", NULL) != 0) + { + return CALLBACK_ERROR; + } + + if (json_array_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "pktlen", "data", NULL) != 0) + { + return CALLBACK_ERROR; + } + + if (json_array_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "bins", "c_to_s", NULL) != 0) + { + return CALLBACK_ERROR; + } + + if (json_array_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "bins", "s_to_c", NULL) != 0) + { + return CALLBACK_ERROR; + } + + if (json_array_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "directions", NULL) != 0) { return CALLBACK_ERROR; } + if (json_array_to_csv(sock, csv_buf, &csv_buf_used, "data_analysis", "entropies", NULL) != 0) { + return CALLBACK_ERROR; + } + + json_value_to_csv(sock, csv_buf, &csv_buf_used, "ndpi", "proto", NULL); + json_value_to_csv(sock, csv_buf, &csv_buf_used, "ndpi", "proto_id", NULL); + json_value_to_csv(sock, csv_buf, &csv_buf_used, "ndpi", "encrypted", NULL); + json_value_to_csv(sock, csv_buf, &csv_buf_used, "ndpi", "breed", NULL); + json_value_to_csv(sock, csv_buf, &csv_buf_used, "ndpi", "category", NULL); + { + struct nDPIsrvd_json_token const * const token = TOKEN_GET_SZ(sock, "ndpi", "confidence"); struct nDPIsrvd_json_token const * current = NULL; int next_child_index = -1; - while ((current = nDPIsrvd_get_next_token(sock, iat, &next_child_index)) != NULL) + if (token == NULL) { - size_t key_length = 0; - char const * const key = TOKEN_GET_KEY(sock, current, &key_length); + csv_buf_add(csv_buf, &csv_buf_used, NULL, 0); + csv_buf_add(csv_buf, &csv_buf_used, NULL, 0); + } + else + { + while ((current = nDPIsrvd_get_next_token(sock, token, &next_child_index)) != NULL) + { + size_t key_length = 0, value_length = 0; + char const * const key = TOKEN_GET_KEY(sock, current, &key_length); + char const * const value = TOKEN_GET_VALUE(sock, current, &value_length); - printf("__%.*s__\n", (int)key_length, key); - next_child_index++; + csv_buf_add(csv_buf, &csv_buf_used, key, key_length); + csv_buf_add(csv_buf, &csv_buf_used, value, value_length); + } } -printf("===\n"); } + if (csv_buf_used > 0 && csv_buf[csv_buf_used - 1] == ',') + { + csv_buf[--csv_buf_used] = '\0'; + } + + fprintf(csv_fp, "%.*s\n", (int)csv_buf_used, csv_buf); + return CALLBACK_OK; } -static void simple_flow_cleanup_callback(struct nDPIsrvd_socket * const sock, - struct nDPIsrvd_instance * const instance, - struct nDPIsrvd_thread_data * const thread_data, - struct nDPIsrvd_flow * const flow, - enum nDPIsrvd_cleanup_reason reason) +static void print_usage(char const * const arg0) { + static char const usage[] = + "Usage: %s " + "[-d] [-p pidfile] [-s host]\n" + "\t \t[-u user] [-g group] [-o csv-outfile]\n\n" + "\t-d\tForking into background after initialization.\n" + "\t-p\tWrite the daemon PID to the given file path.\n" + "\t-s\tDestination where nDPIsrvd is listening on.\n" + "\t \tCan be either a path to UNIX socket or an IPv4/TCP-Port IPv6/TCP-Port tuple.\n" + "\t-u\tChange user.\n" + "\t-g\tChange group.\n" + "\t-o\tSpecify the CSV output file for analysis results\n\n"; + + fprintf(stderr, usage, arg0); +} + +static int parse_options(int argc, char ** argv) +{ + int opt; + + while ((opt = getopt(argc, argv, "hdp:s:u:g:o:")) != -1) + { + switch (opt) + { + case 'd': + daemonize_enable(); + break; + case 'p': + free(pidfile); + pidfile = strdup(optarg); + break; + case 's': + free(serv_optarg); + serv_optarg = strdup(optarg); + break; + case 'u': + free(user); + user = strdup(optarg); + break; + case 'g': + free(group); + group = strdup(optarg); + break; + case 'o': + free(csv_outfile); + csv_outfile = strdup(optarg); + break; + default: + print_usage(argv[0]); + return 1; + } + } + + if (csv_outfile == NULL) + { + fprintf(stderr, "%s: Missing CSV output file (`-o')\n", argv[0]); + return 1; + } + + opt = 0; + if (access(csv_outfile, F_OK) != 0 && errno == ENOENT) + { + opt = 1; + } + + csv_fp = fopen(csv_outfile, "a+"); + if (csv_fp == NULL) + { + fprintf(stderr, "%s: Could not open file `%s' for appending\n", argv[0], csv_outfile); + return 1; + } + + if (opt != 0) + { + fprintf(csv_fp, + "flow_datalink,l3_proto,src_ip,dst_ip,l4_proto,src_port,dst_port,flow_state,flow_src_packets_processed," + "flow_dst_packets_processed,flow_first_seen,flow_src_last_pkt_time,flow_dst_last_pkt_time,flow_src_min_" + "l4_payload_len,flow_dst_min_l4_payload_len,flow_src_max_l4_payload_len,flow_dst_max_l4_payload_len," + "flow_src_tot_l4_payload_len,flow_dst_tot_l4_payload_len,midstream,iat_min,iat_avg,iat_max,iat_stddev," + "iat_var,iat_ent,iat_data,pktlen_min,pktlen_avg,pktlen_max,pktlen_stddev,pktlen_var,pktlen_ent,pktlen_" + "data,bins_c_to_s,bins_s_to_c,directions,entropies,proto,proto_id,encrypted,breed,category," + "confidence_id,confidence\n"); + } + + if (serv_optarg == NULL) + { + serv_optarg = strdup(DISTRIBUTOR_UNIX_SOCKET); + } + + if (nDPIsrvd_setup_address(&sock->address, serv_optarg) != 0) + { + fprintf(stderr, "%s: Could not parse address `%s'\n", argv[0], serv_optarg); + return 1; + } + + if (optind < argc) + { + fprintf(stderr, "Unexpected argument after options\n\n"); + print_usage(argv[0]); + return 1; + } + + return 0; } int main(int argc, char ** argv) @@ -193,23 +502,38 @@ int main(int argc, char ** argv) signal(SIGTERM, sighandler); signal(SIGPIPE, sighandler); - sock = nDPIsrvd_socket_init(0, 0, 0, 0, simple_json_callback, NULL, simple_flow_cleanup_callback); + sock = nDPIsrvd_socket_init(0, 0, 0, 0, simple_json_callback, NULL, NULL); if (sock == NULL) { return 1; } - if (nDPIsrvd_setup_address(&sock->address, (argc > 1 ? argv[1] : "127.0.0.1:7000")) != 0) + if (parse_options(argc, argv) != 0) { return 1; } + printf("Recv buffer size: %u\n", NETWORK_BUFFER_MAX_SIZE); + printf("Connecting to `%s'..\n", serv_optarg); + if (nDPIsrvd_connect(sock) != CONNECT_OK) { + fprintf(stderr, "%s: nDPIsrvd socket connect to %s failed!\n", argv[0], serv_optarg); nDPIsrvd_socket_free(&sock); return 1; } + signal(SIGUSR1, sighandler); + signal(SIGINT, sighandler); + signal(SIGTERM, sighandler); + signal(SIGPIPE, sighandler); + + if (daemonize_with_pidfile(pidfile) != 0) + { + return 1; + } + openlog("nDPIsrvd-analyzed", LOG_CONS, LOG_DAEMON); + if (nDPIsrvd_set_read_timeout(sock, 180, 0) != 0) { return 1; @@ -247,5 +571,9 @@ int main(int argc, char ** argv) printf("Parse read %s\n", nDPIsrvd_enum_to_string(read_ret)); } - return 1; + nDPIsrvd_socket_free(&sock); + daemonize_shutdown(pidfile); + closelog(); + + return read_ret; } diff --git a/examples/py-flow-info/flow-info.py b/examples/py-flow-info/flow-info.py index 3c58858ed..0fba6fcaa 100755 --- a/examples/py-flow-info/flow-info.py +++ b/examples/py-flow-info/flow-info.py @@ -363,18 +363,18 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): flow_event_name += '{}{:>16}{}'.format(TermColor.WARNING, json_dict['flow_event_name'], TermColor.END) if args.print_analyse_results is True: - next_lines = [' {:>9}|{:>9}|{:>9}|{:>9}|{:>9}|{:>9}'.format( + next_lines = [' {:>9}|{:>9}|{:>9}|{:>9}|{:>15}|{:>8}'.format( 'min', 'max', 'avg', 'stddev', 'variance', 'entropy')] - next_lines += ['[IAT.........: {:>9.3f}|{:>9.3f}|{:>9.3f}|{:>9.3f}|{:>9.3f}|{:>9.3f}]'.format( + next_lines += ['[IAT.........: {:>9.3f}|{:>9.3f}|{:>9.3f}|{:>9.3f}|{:>15.3f}|{:>8.3f}]'.format( nDPIsrvd.toSeconds(json_dict['data_analysis']['iat']['min']), nDPIsrvd.toSeconds(json_dict['data_analysis']['iat']['max']), nDPIsrvd.toSeconds(json_dict['data_analysis']['iat']['avg']), nDPIsrvd.toSeconds(json_dict['data_analysis']['iat']['stddev']), nDPIsrvd.toSeconds(json_dict['data_analysis']['iat']['var']), - nDPIsrvd.toSeconds(json_dict['data_analysis']['iat']['ent']) + json_dict['data_analysis']['iat']['ent'] )] next_lines += [''] - next_lines[-1] += '[PKTLEN......: {:>9.3f}|{:>9.3f}|{:>9.3f}|{:>9.3f}|{:>9.3f}|{:>9.3f}]'.format( + next_lines[-1] += '[PKTLEN......: {:>9.3f}|{:>9.3f}|{:>9.3f}|{:>9.3f}|{:>15.3f}|{:>8.3f}]'.format( json_dict['data_analysis']['pktlen']['min'], json_dict['data_analysis']['pktlen']['max'], json_dict['data_analysis']['pktlen']['avg'], @@ -396,6 +396,12 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): next_lines[-1] += '[IATS(ms)....: {}]'.format(iats) next_lines += [''] next_lines[-1] += '[PKTLENS.....: {}]'.format(','.join([str(n) for n in json_dict['data_analysis']['pktlen']['data']])) + next_lines += [''] + ents = '' + for n in json_dict['data_analysis']['entropies']: + ents += '{:.1f},'.format(n) + ents = ents[:-1] + next_lines[-1] += '[ENTROPIES...: {}]'.format(ents) else: if json_dict['flow_event_name'] == 'new': line_suffix = '' @@ -416,6 +422,9 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): line_suffix += ']' flow_event_name += '{}{:>16}{}'.format(flow_active_color, json_dict['flow_event_name'], TermColor.END) + if args.print_hostname is True and 'ndpi' in json_dict and 'hostname' in json_dict['ndpi']: + line_suffix += '[{}]'.format(json_dict['ndpi']['hostname']) + if json_dict['l3_proto'] == 'ip4': print('{}{}{}{}{}: [{:.>6}] [{}][{:.>5}] [{:.>15}]{} -> [{:.>15}]{} {}{}' \ ''.format(timestamp, first_seen, last_seen, instance_and_source, flow_event_name, @@ -472,6 +481,7 @@ if __name__ == '__main__': argparser.add_argument('--analyse', action='store_true', default=False, help='Print only analyse flow events.') argparser.add_argument('--detection', action='store_true', default=False, help='Print only detected/detection-update flow events.') argparser.add_argument('--ipwhois', action='store_true', default=False, help='Use Python-IPWhois to print additional location information.') + argparser.add_argument('--print-hostname', action='store_true', default=False, help='Print detected hostnames if available.') argparser.add_argument('--print-analyse-results', action='store_true', default=False, help='Print detailed results of analyse events.') args = argparser.parse_args() |