aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2020-07-10 20:48:23 +0200
committerToni Uhlig <matzeton@googlemail.com>2020-07-10 20:48:23 +0200
commita9aa4f12362e21fe59ef59c2e1f20b0dca2185d4 (patch)
tree81fc2f1aa7ac7931911fed786514d6ada7e727fc
parent7867c3979d7c9f6d5cee28636328c0f1329fa290 (diff)
clang-format and set BreakBeforeBraces to Allmanz
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
-rw-r--r--.clang-format2
-rw-r--r--nDPId.c658
2 files changed, 439 insertions, 221 deletions
diff --git a/.clang-format b/.clang-format
index cdf33a6a0..aeb942a71 100644
--- a/.clang-format
+++ b/.clang-format
@@ -51,7 +51,7 @@ Cpp11BracedListStyle: true
IndentWidth: 4
TabWidth: 4
UseTab: Never
-BreakBeforeBraces: Linux
+BreakBeforeBraces: Allman
SpacesInParentheses: false
SpacesInSquareBrackets: false
SpacesInAngles : false
diff --git a/nDPId.c b/nDPId.c
index c1facbf94..2cfa375b0 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -28,9 +28,14 @@
#define MAX_IDLE_TIME 300000 /* msec */
#define INITIAL_THREAD_HASH 0x03dd018b
-enum nDPId_l3_type { L3_IP, L3_IP6 };
+enum nDPId_l3_type
+{
+ L3_IP,
+ L3_IP6
+};
-struct nDPId_flow_info {
+struct nDPId_flow_info
+{
uint32_t flow_id;
unsigned long long int packets_processed;
uint64_t first_seen;
@@ -39,11 +44,13 @@ struct nDPId_flow_info {
enum nDPId_l3_type l3_type;
union {
- struct {
+ struct
+ {
uint32_t src;
uint32_t dst;
} v4;
- struct {
+ struct
+ {
uint64_t src[2];
uint64_t dst[2];
} v6;
@@ -70,7 +77,8 @@ struct nDPId_flow_info {
struct ndpi_id_struct * ndpi_dst;
};
-struct nDPId_workflow {
+struct nDPId_workflow
+{
pcap_t * pcap_handle;
uint8_t error_or_eof : 1;
@@ -99,7 +107,8 @@ struct nDPId_workflow {
struct ndpi_detection_module_struct * ndpi_struct;
};
-struct nDPId_reader_thread {
+struct nDPId_reader_thread
+{
struct nDPId_workflow * workflow;
pthread_t thread_id;
#ifndef DISABLE_JSONIZER
@@ -109,8 +118,25 @@ struct nDPId_reader_thread {
int array_index;
};
-enum flow_event { FLOW_INVALID = 0, FLOW_NEW, FLOW_END, FLOW_IDLE, FLOW_GUESSED, FLOW_DETECTED, FLOW_NOT_DETECTED };
-enum basic_event { BASIC_INVALID = 0, NON_ETHERNET_OR_IP_PACKET, ETHERNET_PACKET_TOO_SHORT, ETHERNET_PACKET_UNKNOWN, IP4_PACKET_TOO_SHORT, IP6_PACKET_TOO_SHORT };
+enum flow_event
+{
+ FLOW_INVALID = 0,
+ FLOW_NEW,
+ FLOW_END,
+ FLOW_IDLE,
+ FLOW_GUESSED,
+ FLOW_DETECTED,
+ FLOW_NOT_DETECTED
+};
+enum basic_event
+{
+ BASIC_INVALID = 0,
+ NON_ETHERNET_OR_IP_PACKET,
+ ETHERNET_PACKET_TOO_SHORT,
+ ETHERNET_PACKET_UNKNOWN,
+ IP4_PACKET_TOO_SHORT,
+ IP6_PACKET_TOO_SHORT
+};
static struct nDPId_reader_thread reader_threads[MAX_READER_THREADS] = {};
static int reader_thread_count = MAX_READER_THREADS;
@@ -135,18 +161,23 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device)
char pcap_error_buffer[PCAP_ERRBUF_SIZE];
struct nDPId_workflow * workflow = (struct nDPId_workflow *)ndpi_calloc(1, sizeof(*workflow));
- if (workflow == NULL) {
+ if (workflow == NULL)
+ {
return NULL;
}
- if (access(file_or_device, R_OK) != 0 && errno == ENOENT) {
+ if (access(file_or_device, R_OK) != 0 && errno == ENOENT)
+ {
workflow->pcap_handle = pcap_open_live(file_or_device, /* 1536 */ 65535, 1, 250, pcap_error_buffer);
- } else {
+ }
+ else
+ {
workflow->pcap_handle =
pcap_open_offline_with_tstamp_precision(file_or_device, PCAP_TSTAMP_PRECISION_MICRO, pcap_error_buffer);
}
- if (workflow->pcap_handle == NULL) {
+ if (workflow->pcap_handle == NULL)
+ {
fprintf(stderr, "pcap_open_live / pcap_open_offline_with_tstamp_precision: %s\n", pcap_error_buffer);
free_workflow(&workflow);
return NULL;
@@ -154,7 +185,8 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device)
ndpi_init_prefs init_prefs = ndpi_no_prefs;
workflow->ndpi_struct = ndpi_init_detection_module(init_prefs);
- if (workflow->ndpi_struct == NULL) {
+ if (workflow->ndpi_struct == NULL)
+ {
free_workflow(&workflow);
return NULL;
}
@@ -162,7 +194,8 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device)
workflow->total_active_flows = 0;
workflow->max_active_flows = MAX_FLOW_ROOTS_PER_THREAD;
workflow->ndpi_flows_active = (void **)ndpi_calloc(workflow->max_active_flows, sizeof(void *));
- if (workflow->ndpi_flows_active == NULL) {
+ if (workflow->ndpi_flows_active == NULL)
+ {
free_workflow(&workflow);
return NULL;
}
@@ -170,7 +203,8 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device)
workflow->total_idle_flows = 0;
workflow->max_idle_flows = MAX_IDLE_FLOWS_PER_THREAD;
workflow->ndpi_flows_idle = (void **)ndpi_calloc(workflow->max_idle_flows, sizeof(void *));
- if (workflow->ndpi_flows_idle == NULL) {
+ if (workflow->ndpi_flows_idle == NULL)
+ {
free_workflow(&workflow);
return NULL;
}
@@ -180,7 +214,8 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device)
ndpi_set_protocol_detection_bitmask2(workflow->ndpi_struct, &protos);
ndpi_finalize_initalization(workflow->ndpi_struct);
- if (ndpi_init_serializer_ll(&workflow->ndpi_serializer, ndpi_serialization_format_json, BUFSIZ) != 1) {
+ if (ndpi_init_serializer_ll(&workflow->ndpi_serializer, ndpi_serialization_format_json, BUFSIZ) != 1)
+ {
return NULL;
}
@@ -201,19 +236,23 @@ static void free_workflow(struct nDPId_workflow ** const workflow)
{
struct nDPId_workflow * const w = *workflow;
- if (w == NULL) {
+ if (w == NULL)
+ {
return;
}
- if (w->pcap_handle != NULL) {
+ if (w->pcap_handle != NULL)
+ {
pcap_close(w->pcap_handle);
w->pcap_handle = NULL;
}
- if (w->ndpi_struct != NULL) {
+ if (w->ndpi_struct != NULL)
+ {
ndpi_exit_detection_module(w->ndpi_struct);
}
- for (size_t i = 0; i < w->max_active_flows; i++) {
+ for (size_t i = 0; i < w->max_active_flows; i++)
+ {
ndpi_tdestroy(w->ndpi_flows_active[i], ndpi_flow_info_freer);
}
ndpi_free(w->ndpi_flows_active);
@@ -228,23 +267,30 @@ static int setup_reader_threads(char const * const file_or_device)
char const * file_or_default_device;
char pcap_error_buffer[PCAP_ERRBUF_SIZE];
- if (reader_thread_count > MAX_READER_THREADS) {
+ if (reader_thread_count > MAX_READER_THREADS)
+ {
return 1;
}
- if (file_or_device == NULL) {
+ if (file_or_device == NULL)
+ {
file_or_default_device = pcap_lookupdev(pcap_error_buffer);
- if (file_or_default_device == NULL) {
+ if (file_or_default_device == NULL)
+ {
fprintf(stderr, "pcap_lookupdev: %s\n", pcap_error_buffer);
return 1;
}
- } else {
+ }
+ else
+ {
file_or_default_device = file_or_device;
}
- for (int i = 0; i < reader_thread_count; ++i) {
+ for (int i = 0; i < reader_thread_count; ++i)
+ {
reader_threads[i].workflow = init_workflow(file_or_default_device);
- if (reader_threads[i].workflow == NULL) {
+ if (reader_threads[i].workflow == NULL)
+ {
return 1;
}
}
@@ -258,7 +304,8 @@ static int ip_tuple_to_string(struct nDPId_flow_info const * const flow,
char * const dst_addr_str,
size_t dst_addr_len)
{
- switch (flow->l3_type) {
+ switch (flow->l3_type)
+ {
case L3_IP:
return inet_ntop(AF_INET, (struct sockaddr_in *)&flow->ip_tuple.v4.src, src_addr_str, src_addr_len) !=
NULL &&
@@ -292,20 +339,26 @@ static void print_packet_info(struct nDPId_reader_thread const * const reader_th
reader_thread->array_index,
flow->flow_id,
header->caplen);
- if (ret > 0) {
+ if (ret > 0)
+ {
used += ret;
}
- if (ip_tuple_to_string(flow, src_addr_str, sizeof(src_addr_str), dst_addr_str, sizeof(dst_addr_str)) != 0) {
+ if (ip_tuple_to_string(flow, src_addr_str, sizeof(src_addr_str), dst_addr_str, sizeof(dst_addr_str)) != 0)
+ {
ret = snprintf(buf + used, sizeof(buf) - used, "IP[%s -> %s]", src_addr_str, dst_addr_str);
- } else {
+ }
+ else
+ {
ret = snprintf(buf + used, sizeof(buf) - used, "IP[ERROR]");
}
- if (ret > 0) {
+ if (ret > 0)
+ {
used += ret;
}
- switch (flow->l4_protocol) {
+ switch (flow->l4_protocol)
+ {
case IPPROTO_UDP:
ret = snprintf(buf + used,
sizeof(buf) - used,
@@ -335,7 +388,8 @@ static void print_packet_info(struct nDPId_reader_thread const * const reader_th
ret = snprintf(buf + used, sizeof(buf) - used, " -> Unknown[0x%X]", flow->l4_protocol);
break;
}
- if (ret > 0) {
+ if (ret > 0)
+ {
used += ret;
}
@@ -346,14 +400,18 @@ static void print_packet_info(struct nDPId_reader_thread const * const reader_th
static int ip_tuples_equal(struct nDPId_flow_info const * const A, struct nDPId_flow_info const * const B)
{
// generate a warning if the enum changes
- switch (A->l3_type) {
+ switch (A->l3_type)
+ {
case L3_IP:
case L3_IP6:
break;
}
- if (A->l3_type == L3_IP && B->l3_type == L3_IP6) {
+ if (A->l3_type == L3_IP && B->l3_type == L3_IP6)
+ {
return A->ip_tuple.v4.src == B->ip_tuple.v4.src && A->ip_tuple.v4.dst == B->ip_tuple.v4.dst;
- } else if (A->l3_type == L3_IP6 && B->l3_type == L3_IP6) {
+ }
+ else if (A->l3_type == L3_IP6 && B->l3_type == L3_IP6)
+ {
return A->ip_tuple.v6.src[0] == B->ip_tuple.v6.src[0] && A->ip_tuple.v6.src[1] == B->ip_tuple.v6.src[1] &&
A->ip_tuple.v6.dst[0] == B->ip_tuple.v6.dst[0] && A->ip_tuple.v6.dst[1] == B->ip_tuple.v6.dst[1];
}
@@ -363,31 +421,42 @@ static int ip_tuples_equal(struct nDPId_flow_info const * const A, struct nDPId_
static int ip_tuples_compare(struct nDPId_flow_info const * const A, struct nDPId_flow_info const * const B)
{
// generate a warning if the enum changes
- switch (A->l3_type) {
+ switch (A->l3_type)
+ {
case L3_IP:
case L3_IP6:
break;
}
- if (A->l3_type == L3_IP && B->l3_type == L3_IP6) {
- if (A->ip_tuple.v4.src < B->ip_tuple.v4.src || A->ip_tuple.v4.dst < B->ip_tuple.v4.dst) {
+ if (A->l3_type == L3_IP && B->l3_type == L3_IP6)
+ {
+ if (A->ip_tuple.v4.src < B->ip_tuple.v4.src || A->ip_tuple.v4.dst < B->ip_tuple.v4.dst)
+ {
return -1;
}
- if (A->ip_tuple.v4.src > B->ip_tuple.v4.src || A->ip_tuple.v4.dst > B->ip_tuple.v4.dst) {
+ if (A->ip_tuple.v4.src > B->ip_tuple.v4.src || A->ip_tuple.v4.dst > B->ip_tuple.v4.dst)
+ {
return 1;
}
- } else if (A->l3_type == L3_IP6 && B->l3_type == L3_IP6) {
+ }
+ else if (A->l3_type == L3_IP6 && B->l3_type == L3_IP6)
+ {
if ((A->ip_tuple.v6.src[0] < B->ip_tuple.v6.src[0] && A->ip_tuple.v6.src[1] < B->ip_tuple.v6.src[1]) ||
- (A->ip_tuple.v6.dst[0] < B->ip_tuple.v6.dst[0] && A->ip_tuple.v6.dst[1] < B->ip_tuple.v6.dst[1])) {
+ (A->ip_tuple.v6.dst[0] < B->ip_tuple.v6.dst[0] && A->ip_tuple.v6.dst[1] < B->ip_tuple.v6.dst[1]))
+ {
return -1;
}
if ((A->ip_tuple.v6.src[0] > B->ip_tuple.v6.src[0] && A->ip_tuple.v6.src[1] > B->ip_tuple.v6.src[1]) ||
- (A->ip_tuple.v6.dst[0] > B->ip_tuple.v6.dst[0] && A->ip_tuple.v6.dst[1] > B->ip_tuple.v6.dst[1])) {
+ (A->ip_tuple.v6.dst[0] > B->ip_tuple.v6.dst[0] && A->ip_tuple.v6.dst[1] > B->ip_tuple.v6.dst[1]))
+ {
return 1;
}
}
- if (A->src_port < B->src_port || A->dst_port < B->dst_port) {
+ if (A->src_port < B->src_port || A->dst_port < B->dst_port)
+ {
return -1;
- } else if (A->src_port > B->src_port || A->dst_port > B->dst_port) {
+ }
+ else if (A->src_port > B->src_port || A->dst_port > B->dst_port)
+ {
return 1;
}
return 0;
@@ -400,17 +469,21 @@ static void ndpi_idle_scan_walker(void const * const A, ndpi_VISIT which, int de
(void)depth;
- if (workflow == NULL || flow == NULL) {
+ if (workflow == NULL || flow == NULL)
+ {
return;
}
- if (workflow->cur_idle_flows == MAX_IDLE_FLOWS_PER_THREAD) {
+ if (workflow->cur_idle_flows == MAX_IDLE_FLOWS_PER_THREAD)
+ {
return;
}
- if (which == ndpi_preorder || which == ndpi_leaf) {
+ if (which == ndpi_preorder || which == ndpi_leaf)
+ {
if ((flow->flow_fin_ack_seen == 1 && flow->flow_ack_seen == 1) ||
- flow->last_seen + MAX_IDLE_TIME < workflow->last_time) {
+ flow->last_seen + MAX_IDLE_TIME < workflow->last_time)
+ {
char src_addr_str[INET6_ADDRSTRLEN + 1];
char dst_addr_str[INET6_ADDRSTRLEN + 1];
ip_tuple_to_string(flow, src_addr_str, sizeof(src_addr_str), dst_addr_str, sizeof(dst_addr_str));
@@ -425,21 +498,28 @@ static int ndpi_workflow_node_cmp(void const * const A, void const * const B)
struct nDPId_flow_info const * const flow_info_a = (struct nDPId_flow_info *)A;
struct nDPId_flow_info const * const flow_info_b = (struct nDPId_flow_info *)B;
- if (flow_info_a->hashval < flow_info_b->hashval) {
+ if (flow_info_a->hashval < flow_info_b->hashval)
+ {
return (-1);
- } else if (flow_info_a->hashval > flow_info_b->hashval) {
+ }
+ else if (flow_info_a->hashval > flow_info_b->hashval)
+ {
return (1);
}
/* Flows have the same hash */
- if (flow_info_a->l4_protocol < flow_info_b->l4_protocol) {
+ if (flow_info_a->l4_protocol < flow_info_b->l4_protocol)
+ {
return (-1);
- } else if (flow_info_a->l4_protocol > flow_info_b->l4_protocol) {
+ }
+ else if (flow_info_a->l4_protocol > flow_info_b->l4_protocol)
+ {
return (1);
}
if (ip_tuples_equal(flow_info_a, flow_info_b) != 0 && flow_info_a->src_port == flow_info_b->src_port &&
- flow_info_a->dst_port == flow_info_b->dst_port) {
+ flow_info_a->dst_port == flow_info_b->dst_port)
+ {
return (0);
}
@@ -450,17 +530,23 @@ static void check_for_idle_flows(struct nDPId_reader_thread * const reader_threa
{
struct nDPId_workflow * const workflow = reader_thread->workflow;
- if (workflow->last_idle_scan_time + IDLE_SCAN_PERIOD < workflow->last_time) {
- for (size_t idle_scan_index = 0; idle_scan_index < workflow->max_active_flows; ++idle_scan_index) {
+ if (workflow->last_idle_scan_time + IDLE_SCAN_PERIOD < workflow->last_time)
+ {
+ for (size_t idle_scan_index = 0; idle_scan_index < workflow->max_active_flows; ++idle_scan_index)
+ {
ndpi_twalk(workflow->ndpi_flows_active[idle_scan_index], ndpi_idle_scan_walker, workflow);
- while (workflow->cur_idle_flows > 0) {
+ while (workflow->cur_idle_flows > 0)
+ {
struct nDPId_flow_info * const f =
(struct nDPId_flow_info *)workflow->ndpi_flows_idle[--workflow->cur_idle_flows];
#ifdef DISABLE_JSONIZER
- if (f->flow_fin_ack_seen == 1) {
+ if (f->flow_fin_ack_seen == 1)
+ {
printf("Free fin flow with id %u\n", f->flow_id);
- } else {
+ }
+ else
+ {
printf("Free idle flow with id %u\n", f->flow_id);
}
#else
@@ -483,7 +569,8 @@ static int jsonize_l3_l4_dpi(struct nDPId_workflow * const workflow, struct nDPI
char src_name[32] = {};
char dst_name[32] = {};
- switch (flow->l3_type) {
+ switch (flow->l3_type)
+ {
case L3_IP:
ndpi_serialize_string_string(serializer, "l3_proto", "ip4");
inet_ntop(AF_INET, &flow->ip_tuple.v4.src, src_name, sizeof(src_name));
@@ -502,14 +589,17 @@ static int jsonize_l3_l4_dpi(struct nDPId_workflow * const workflow, struct nDPI
ndpi_serialize_string_string(serializer, "src_ip", src_name);
ndpi_serialize_string_string(serializer, "dest_ip", dst_name);
- if (flow->src_port) {
+ if (flow->src_port)
+ {
ndpi_serialize_string_uint32(serializer, "src_port", flow->src_port);
}
- if (flow->dst_port) {
+ if (flow->dst_port)
+ {
ndpi_serialize_string_uint32(serializer, "dst_port", flow->dst_port);
}
- switch (flow->l4_protocol) {
+ switch (flow->l4_protocol)
+ {
case IPPROTO_TCP:
ndpi_serialize_string_string(serializer, "l4_proto", "tcp");
break;
@@ -538,8 +628,7 @@ static void jsonize_basic(struct nDPId_reader_thread * const reader_thread)
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "packet_id", workflow->packets_captured);
}
-static void jsonize_flow(struct nDPId_workflow * const workflow,
- struct nDPId_flow_info const * const flow)
+static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_flow_info const * const flow)
{
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "flow_id", flow->flow_id);
ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_l4_data_len", flow->total_l4_data_len);
@@ -550,7 +639,8 @@ static void jsonize_flow(struct nDPId_workflow * const workflow,
(flow->packets_processed > 0 ? flow->total_l4_data_len / flow->packets_processed : 0));
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "midstream", flow->is_midstream_flow);
- if (jsonize_l3_l4_dpi(workflow, flow) != 0) {
+ if (jsonize_l3_l4_dpi(workflow, flow) != 0)
+ {
syslog(LOG_DAEMON | LOG_ERR,
"[%8llu, %4u] flow2json/dpi2json failed\n",
workflow->packets_captured,
@@ -565,7 +655,8 @@ static int connect_to_json_socket(struct nDPId_reader_thread * const reader_thre
close(reader_thread->json_sockfd);
reader_thread->json_sockfd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
- if (reader_thread->json_sockfd < 0) {
+ if (reader_thread->json_sockfd < 0)
+ {
reader_thread->json_sock_reconnect = 1;
return 1;
}
@@ -578,7 +669,8 @@ static int connect_to_json_socket(struct nDPId_reader_thread * const reader_thre
return 1;
}
- if (fcntl(reader_thread->json_sockfd, F_SETFL, fcntl(reader_thread->json_sockfd, F_GETFL, 0) | O_NONBLOCK) == -1) {
+ if (fcntl(reader_thread->json_sockfd, F_SETFL, fcntl(reader_thread->json_sockfd, F_GETFL, 0) | O_NONBLOCK) == -1)
+ {
reader_thread->json_sock_reconnect = 1;
return 1;
}
@@ -589,7 +681,8 @@ static int connect_to_json_socket(struct nDPId_reader_thread * const reader_thre
}
static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
- char const * const json_str, size_t json_str_len)
+ char const * const json_str,
+ size_t json_str_len)
{
struct nDPId_workflow * const workflow = reader_thread->workflow;
int saved_errno;
@@ -597,7 +690,8 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
char newline_json_str[BUFSIZ];
s_ret = snprintf(newline_json_str, sizeof(newline_json_str), "%.*s\n", (int)json_str_len, json_str);
- if (s_ret < 0 || s_ret > (int)sizeof(newline_json_str)) {
+ if (s_ret < 0 || s_ret > (int)sizeof(newline_json_str))
+ {
syslog(LOG_DAEMON | LOG_ERR,
"[%8llu, %d] JSON buffer prepare failed",
workflow->packets_captured,
@@ -605,8 +699,10 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
return;
}
- if (reader_thread->json_sock_reconnect != 0) {
- if (connect_to_json_socket(reader_thread) == 0) {
+ if (reader_thread->json_sock_reconnect != 0)
+ {
+ if (connect_to_json_socket(reader_thread) == 0)
+ {
syslog(LOG_DAEMON | LOG_ERR,
"[%8llu, %d] Reconnected to JSON sink",
workflow->packets_captured,
@@ -623,7 +719,8 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
workflow->packets_captured,
reader_thread->array_index,
strerror(saved_errno));
- if (saved_errno == EPIPE) {
+ if (saved_errno == EPIPE)
+ {
syslog(LOG_DAEMON | LOG_ERR,
"[%8llu, %d] Lost connection to JSON sink",
workflow->packets_captured,
@@ -639,14 +736,17 @@ static void serialize_and_send(struct nDPId_reader_thread * const reader_thread)
uint32_t json_str_len;
json_str = ndpi_serializer_get_buffer(&reader_thread->workflow->ndpi_serializer, &json_str_len);
- if (json_str == NULL || json_str_len == 0) {
+ if (json_str == NULL || json_str_len == 0)
+ {
syslog(LOG_DAEMON | LOG_ERR,
"[%8llu, %d] jsonize failed, buffer length: %u\n",
reader_thread->workflow->packets_captured,
reader_thread->array_index,
json_str_len);
- } else {
+ }
+ else
+ {
send_to_json_sink(reader_thread, json_str, json_str_len);
}
@@ -662,7 +762,8 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
ndpi_serialize_string_int32(&workflow->ndpi_serializer, "flow_event_id", event);
- switch (event) {
+ switch (event)
+ {
case FLOW_INVALID:
ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, "invalid");
break;
@@ -692,48 +793,54 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
static void jsonize_format_error(struct nDPId_reader_thread * const reader_thread, uint32_t format_index)
{
- ndpi_serialize_string_string(&reader_thread->workflow->ndpi_serializer,
- "serializer-error", "format");
- ndpi_serialize_string_uint32(&reader_thread->workflow->ndpi_serializer,
- "serializer-format-index", format_index);
+ ndpi_serialize_string_string(&reader_thread->workflow->ndpi_serializer, "serializer-error", "format");
+ ndpi_serialize_string_uint32(&reader_thread->workflow->ndpi_serializer, "serializer-format-index", format_index);
serialize_and_send(reader_thread);
}
-static void vjsonize_basic_eventf(struct nDPId_reader_thread * const reader_thread,
- char const * format, va_list ap)
+static void vjsonize_basic_eventf(struct nDPId_reader_thread * const reader_thread, char const * format, va_list ap)
{
uint8_t got_jsonkey = 0;
uint8_t is_long_long = 0;
char json_key[BUFSIZ];
uint32_t format_index = 0;
- while (*format) {
- if (got_jsonkey == 0) {
+ while (*format)
+ {
+ if (got_jsonkey == 0)
+ {
json_key[0] = '\0';
}
- switch (*format++) {
- case 's': {
+ switch (*format++)
+ {
+ case 's':
+ {
format_index++;
char * value = va_arg(ap, char *);
- if (got_jsonkey == 0) {
+ if (got_jsonkey == 0)
+ {
snprintf(json_key, sizeof(json_key), "%s", value);
got_jsonkey = 1;
- } else {
- ndpi_serialize_string_string(&reader_thread->workflow->ndpi_serializer,
- json_key, value);
+ }
+ else
+ {
+ ndpi_serialize_string_string(&reader_thread->workflow->ndpi_serializer, json_key, value);
got_jsonkey = 0;
}
break;
}
- case 'f': {
+ case 'f':
+ {
format_index++;
- if (got_jsonkey == 1) {
+ if (got_jsonkey == 1)
+ {
float value = va_arg(ap, double);
- ndpi_serialize_string_float(&reader_thread->workflow->ndpi_serializer,
- json_key, value, "%.2f");
+ ndpi_serialize_string_float(&reader_thread->workflow->ndpi_serializer, json_key, value, "%.2f");
got_jsonkey = 0;
- } else {
+ }
+ else
+ {
jsonize_format_error(reader_thread, format_index);
return;
}
@@ -741,37 +848,50 @@ static void vjsonize_basic_eventf(struct nDPId_reader_thread * const reader_thre
}
case 'l':
format_index++;
- if (got_jsonkey != 1) {
+ if (got_jsonkey != 1)
+ {
jsonize_format_error(reader_thread, format_index);
return;
}
- if (*format == 'l') {
+ if (*format == 'l')
+ {
format++;
is_long_long = 1;
- } else {
+ }
+ else
+ {
is_long_long = 0;
}
- if (*format == 'd') {
+ if (*format == 'd')
+ {
long long int value;
- if (is_long_long != 0) {
+ if (is_long_long != 0)
+ {
value = va_arg(ap, long long int);
- } else {
+ }
+ else
+ {
value = va_arg(ap, long int);
}
- ndpi_serialize_string_int64(&reader_thread->workflow->ndpi_serializer,
- json_key, value);
+ ndpi_serialize_string_int64(&reader_thread->workflow->ndpi_serializer, json_key, value);
got_jsonkey = 0;
- } else if (*format == 'u') {
+ }
+ else if (*format == 'u')
+ {
unsigned long long int value;
- if (is_long_long != 0) {
+ if (is_long_long != 0)
+ {
value = va_arg(ap, unsigned long long int);
- } else {
+ }
+ else
+ {
value = va_arg(ap, unsigned long int);
}
- ndpi_serialize_string_uint64(&reader_thread->workflow->ndpi_serializer,
- json_key, value);
+ ndpi_serialize_string_uint64(&reader_thread->workflow->ndpi_serializer, json_key, value);
got_jsonkey = 0;
- } else {
+ }
+ else
+ {
jsonize_format_error(reader_thread, format_index);
return;
}
@@ -779,24 +899,28 @@ static void vjsonize_basic_eventf(struct nDPId_reader_thread * const reader_thre
break;
case 'u':
format_index++;
- if (got_jsonkey == 1) {
+ if (got_jsonkey == 1)
+ {
unsigned int value = va_arg(ap, unsigned int);
- ndpi_serialize_string_uint32(&reader_thread->workflow->ndpi_serializer,
- json_key, value);
+ ndpi_serialize_string_uint32(&reader_thread->workflow->ndpi_serializer, json_key, value);
got_jsonkey = 0;
- } else {
+ }
+ else
+ {
jsonize_format_error(reader_thread, format_index);
return;
}
break;
case 'd':
format_index++;
- if (got_jsonkey == 1) {
+ if (got_jsonkey == 1)
+ {
int value = va_arg(ap, int);
- ndpi_serialize_string_int32(&reader_thread->workflow->ndpi_serializer,
- json_key, value);
+ ndpi_serialize_string_int32(&reader_thread->workflow->ndpi_serializer, json_key, value);
got_jsonkey = 0;
- } else {
+ }
+ else
+ {
jsonize_format_error(reader_thread, format_index);
return;
}
@@ -812,46 +936,42 @@ static void vjsonize_basic_eventf(struct nDPId_reader_thread * const reader_thre
}
}
-__attribute__ ((format (printf, 3, 4)))
-static void jsonize_basic_eventf(struct nDPId_reader_thread * const reader_thread,
- enum basic_event event,
- char const * format, ...)
+__attribute__((format(printf, 3, 4))) static void jsonize_basic_eventf(struct nDPId_reader_thread * const reader_thread,
+ enum basic_event event,
+ char const * format,
+ ...)
{
struct nDPId_workflow * const workflow = reader_thread->workflow;
va_list ap;
char const ev[] = "basic_event_name";
- ndpi_serialize_string_int32(&reader_thread->workflow->ndpi_serializer,
- "basic_event_id", event);
+ ndpi_serialize_string_int32(&reader_thread->workflow->ndpi_serializer, "basic_event_id", event);
- switch (event) {
+ switch (event)
+ {
case BASIC_INVALID:
ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, "invalid");
break;
case NON_ETHERNET_OR_IP_PACKET:
- ndpi_serialize_string_string(&workflow->ndpi_serializer, ev,
- "Captured non IP/Ethernet packet - skipping");
+ ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, "Captured non IP/Ethernet packet - skipping");
break;
case ETHERNET_PACKET_TOO_SHORT:
- ndpi_serialize_string_string(&workflow->ndpi_serializer, ev,
- "Ethernet packet too short - skipping");
+ ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, "Ethernet packet too short - skipping");
break;
case ETHERNET_PACKET_UNKNOWN:
- ndpi_serialize_string_string(&workflow->ndpi_serializer, ev,
- "Unknown Ethernet packet type - skipping");
+ ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, "Unknown Ethernet packet type - skipping");
break;
case IP4_PACKET_TOO_SHORT:
- ndpi_serialize_string_string(&workflow->ndpi_serializer, ev,
- "IP4 packet too short - skipping");
+ ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, "IP4 packet too short - skipping");
break;
case IP6_PACKET_TOO_SHORT:
- ndpi_serialize_string_string(&workflow->ndpi_serializer, ev,
- "IP6 packet too short - skipping");
+ ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, "IP6 packet too short - skipping");
break;
}
jsonize_basic(reader_thread);
- if (format != NULL) {
+ if (format != NULL)
+ {
va_start(ap, format);
vjsonize_basic_eventf(reader_thread, format, ap);
va_end(ap);
@@ -892,12 +1012,14 @@ static void ndpi_process_packet(uint8_t * const args,
uint16_t type;
int thread_index = INITIAL_THREAD_HASH; // generated with `dd if=/dev/random bs=1024 count=1 |& hd'
- if (reader_thread == NULL) {
+ if (reader_thread == NULL)
+ {
return;
}
workflow = reader_thread->workflow;
- if (workflow == NULL) {
+ if (workflow == NULL)
+ {
return;
}
@@ -908,32 +1030,40 @@ static void ndpi_process_packet(uint8_t * const args,
check_for_idle_flows(reader_thread);
/* process datalink layer */
- switch (pcap_datalink(workflow->pcap_handle)) {
+ switch (pcap_datalink(workflow->pcap_handle))
+ {
case DLT_NULL:
- if (ntohl(*((uint32_t *)&packet[eth_offset])) == 0x00000002) {
+ if (ntohl(*((uint32_t *)&packet[eth_offset])) == 0x00000002)
+ {
type = ETH_P_IP;
- } else {
+ }
+ else
+ {
type = ETH_P_IPV6;
}
ip_offset = 4 + eth_offset;
break;
case DLT_EN10MB:
- if (header->len < sizeof(struct ndpi_ethhdr)) {
+ if (header->len < sizeof(struct ndpi_ethhdr))
+ {
jsonize_basic_eventf(reader_thread, ETHERNET_PACKET_TOO_SHORT, NULL);
return;
}
ethernet = (struct ndpi_ethhdr *)&packet[eth_offset];
ip_offset = sizeof(struct ndpi_ethhdr) + eth_offset;
type = ntohs(ethernet->h_proto);
- switch (type) {
+ switch (type)
+ {
case ETH_P_IP: /* IPv4 */
- if (header->len < sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_iphdr)) {
+ if (header->len < sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_iphdr))
+ {
jsonize_basic_eventf(reader_thread, IP4_PACKET_TOO_SHORT, NULL);
return;
}
break;
case ETH_P_IPV6: /* IPV6 */
- if (header->len < sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_ipv6hdr)) {
+ if (header->len < sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_ipv6hdr))
+ {
jsonize_basic_eventf(reader_thread, IP6_PACKET_TOO_SHORT, NULL);
return;
}
@@ -946,17 +1076,23 @@ static void ndpi_process_packet(uint8_t * const args,
}
break;
default:
- jsonize_basic_eventf(reader_thread, NON_ETHERNET_OR_IP_PACKET, "%s%u", "type", pcap_datalink(workflow->pcap_handle));
+ jsonize_basic_eventf(
+ reader_thread, NON_ETHERNET_OR_IP_PACKET, "%s%u", "type", pcap_datalink(workflow->pcap_handle));
return;
}
- if (type == ETH_P_IP) {
+ if (type == ETH_P_IP)
+ {
ip = (struct ndpi_iphdr *)&packet[ip_offset];
ip6 = NULL;
- } else if (type == ETH_P_IPV6) {
+ }
+ else if (type == ETH_P_IPV6)
+ {
ip = NULL;
ip6 = (struct ndpi_ipv6hdr *)&packet[ip_offset];
- } else {
+ }
+ else
+ {
syslog(LOG_DAEMON | LOG_WARNING,
"[%8llu, %d] Captured non IPv4/IPv6 packet with type 0x%X - skipping\n",
workflow->packets_captured,
@@ -966,8 +1102,10 @@ static void ndpi_process_packet(uint8_t * const args,
}
ip_size = header->len - ip_offset;
- if (type == ETH_P_IP && header->len >= ip_offset) {
- if (header->caplen < header->len) {
+ if (type == ETH_P_IP && header->len >= ip_offset)
+ {
+ if (header->caplen < header->len)
+ {
syslog(LOG_DAEMON | LOG_WARNING,
"[%8llu, %d] Captured packet size is smaller than packet size: %u < %u\n",
workflow->packets_captured,
@@ -978,8 +1116,10 @@ static void ndpi_process_packet(uint8_t * const args,
}
/* process layer3 e.g. IPv4 / IPv6 */
- if (ip != NULL && ip->version == 4) {
- if (ip_size < sizeof(*ip)) {
+ if (ip != NULL && ip->version == 4)
+ {
+ if (ip_size < sizeof(*ip))
+ {
syslog(LOG_DAEMON | LOG_WARNING,
"[%8llu, %d] Packet smaller than IP4 header length: %u < %zu\n",
workflow->packets_captured,
@@ -991,7 +1131,8 @@ static void ndpi_process_packet(uint8_t * const args,
flow.l3_type = L3_IP;
if (ndpi_detection_get_l4(
- (uint8_t *)ip, ip_size, &l4_ptr, &l4_len, &flow.l4_protocol, NDPI_DETECTION_ONLY_IPV4) != 0) {
+ (uint8_t *)ip, ip_size, &l4_ptr, &l4_len, &flow.l4_protocol, NDPI_DETECTION_ONLY_IPV4) != 0)
+ {
syslog(LOG_DAEMON | LOG_WARNING,
"[%8llu, %d] nDPI IPv4/L4 payload detection failed, L4 length: %zu\n",
workflow->packets_captured,
@@ -1004,8 +1145,11 @@ static void ndpi_process_packet(uint8_t * const args,
flow.ip_tuple.v4.dst = ip->daddr;
uint32_t min_addr = (flow.ip_tuple.v4.src > flow.ip_tuple.v4.dst ? flow.ip_tuple.v4.dst : flow.ip_tuple.v4.src);
thread_index = min_addr + ip->protocol;
- } else if (ip6 != NULL) {
- if (ip_size < sizeof(ip6->ip6_hdr)) {
+ }
+ else if (ip6 != NULL)
+ {
+ if (ip_size < sizeof(ip6->ip6_hdr))
+ {
syslog(LOG_DAEMON | LOG_WARNING,
"[%8llu, %d] Packet smaller than IP6 header length: %u < %zu\n",
workflow->packets_captured,
@@ -1017,7 +1161,8 @@ static void ndpi_process_packet(uint8_t * const args,
flow.l3_type = L3_IP6;
if (ndpi_detection_get_l4(
- (uint8_t *)ip6, ip_size, &l4_ptr, &l4_len, &flow.l4_protocol, NDPI_DETECTION_ONLY_IPV6) != 0) {
+ (uint8_t *)ip6, ip_size, &l4_ptr, &l4_len, &flow.l4_protocol, NDPI_DETECTION_ONLY_IPV6) != 0)
+ {
syslog(LOG_DAEMON | LOG_WARNING,
"[%8llu, %d] nDPI IPv6/L4 payload detection failed, L4 length: %zu\n",
workflow->packets_captured,
@@ -1031,15 +1176,20 @@ static void ndpi_process_packet(uint8_t * const args,
flow.ip_tuple.v6.dst[0] = ip6->ip6_dst.u6_addr.u6_addr64[0];
flow.ip_tuple.v6.dst[1] = ip6->ip6_dst.u6_addr.u6_addr64[1];
uint64_t min_addr[2];
- if (flow.ip_tuple.v6.src[0] > flow.ip_tuple.v6.dst[0] && flow.ip_tuple.v6.src[1] > flow.ip_tuple.v6.dst[1]) {
+ if (flow.ip_tuple.v6.src[0] > flow.ip_tuple.v6.dst[0] && flow.ip_tuple.v6.src[1] > flow.ip_tuple.v6.dst[1])
+ {
min_addr[0] = flow.ip_tuple.v6.dst[0];
min_addr[1] = flow.ip_tuple.v6.dst[0];
- } else {
+ }
+ else
+ {
min_addr[0] = flow.ip_tuple.v6.src[0];
min_addr[1] = flow.ip_tuple.v6.src[0];
}
thread_index = min_addr[0] + min_addr[1] + ip6->ip6_hdr.ip6_un1_nxt;
- } else {
+ }
+ else
+ {
syslog(LOG_DAEMON | LOG_WARNING,
"[%8llu, %d] Non IP/IPv6 protocol detected: 0x%X\n",
workflow->packets_captured,
@@ -1049,10 +1199,12 @@ static void ndpi_process_packet(uint8_t * const args,
}
/* process layer4 e.g. TCP / UDP */
- if (flow.l4_protocol == IPPROTO_TCP) {
+ if (flow.l4_protocol == IPPROTO_TCP)
+ {
const struct ndpi_tcphdr * tcp;
- if (header->len < (l4_ptr - packet) + sizeof(struct ndpi_tcphdr)) {
+ if (header->len < (l4_ptr - packet) + sizeof(struct ndpi_tcphdr))
+ {
syslog(LOG_DAEMON | LOG_WARNING,
"[%8llu, %d] Malformed TCP packet, packet size smaller than expected: %u < %zu\n",
workflow->packets_captured,
@@ -1067,10 +1219,13 @@ static void ndpi_process_packet(uint8_t * const args,
flow.flow_ack_seen = tcp->ack;
flow.src_port = ntohs(tcp->source);
flow.dst_port = ntohs(tcp->dest);
- } else if (flow.l4_protocol == IPPROTO_UDP) {
+ }
+ else if (flow.l4_protocol == IPPROTO_UDP)
+ {
const struct ndpi_udphdr * udp;
- if (header->len < (l4_ptr - packet) + sizeof(struct ndpi_udphdr)) {
+ if (header->len < (l4_ptr - packet) + sizeof(struct ndpi_udphdr))
+ {
syslog(LOG_DAEMON | LOG_WARNING,
"[%8llu, %d] Malformed UDP packet, packet size smaller than expected: %u < %zu\n",
workflow->packets_captured,
@@ -1087,7 +1242,8 @@ static void ndpi_process_packet(uint8_t * const args,
/* distribute flows to threads while keeping stability (same flow goes always to same thread) */
thread_index += (flow.src_port < flow.dst_port ? flow.dst_port : flow.src_port);
thread_index %= reader_thread_count;
- if (thread_index != reader_thread->array_index) {
+ if (thread_index != reader_thread->array_index)
+ {
return;
}
workflow->packets_processed++;
@@ -1098,7 +1254,8 @@ static void ndpi_process_packet(uint8_t * const args,
#endif
/* calculate flow hash for btree find, search(insert) */
- switch (flow.l3_type) {
+ switch (flow.l3_type)
+ {
case L3_IP:
if (ndpi_flowv4_flow_hash(flow.l4_protocol,
flow.ip_tuple.v4.src,
@@ -1108,7 +1265,8 @@ static void ndpi_process_packet(uint8_t * const args,
0,
0,
(uint8_t *)&flow.hashval,
- sizeof(flow.hashval)) != 0) {
+ sizeof(flow.hashval)) != 0)
+ {
flow.hashval = flow.ip_tuple.v4.src + flow.ip_tuple.v4.dst; // fallback
}
break;
@@ -1121,7 +1279,8 @@ static void ndpi_process_packet(uint8_t * const args,
0,
0,
(uint8_t *)&flow.hashval,
- sizeof(flow.hashval)) != 0) {
+ sizeof(flow.hashval)) != 0)
+ {
flow.hashval = flow.ip_tuple.v6.src[0] + flow.ip_tuple.v6.src[1];
flow.hashval += flow.ip_tuple.v6.dst[0] + flow.ip_tuple.v6.dst[1];
}
@@ -1131,7 +1290,8 @@ static void ndpi_process_packet(uint8_t * const args,
hashed_index = flow.hashval % workflow->max_active_flows;
tree_result = ndpi_tfind(&flow, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp);
- if (tree_result == NULL) {
+ if (tree_result == NULL)
+ {
/* flow not found in btree: switch src <-> dst and try to find it again */
uint64_t orig_src_ip[2] = {flow.ip_tuple.v6.src[0], flow.ip_tuple.v6.src[1]};
uint64_t orig_dst_ip[2] = {flow.ip_tuple.v6.dst[0], flow.ip_tuple.v6.dst[1]};
@@ -1146,7 +1306,8 @@ static void ndpi_process_packet(uint8_t * const args,
flow.dst_port = orig_src_port;
tree_result = ndpi_tfind(&flow, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp);
- if (tree_result != NULL) {
+ if (tree_result != NULL)
+ {
direction_changed = 1;
}
@@ -1158,9 +1319,11 @@ static void ndpi_process_packet(uint8_t * const args,
flow.dst_port = orig_dst_port;
}
- if (tree_result == NULL) {
+ if (tree_result == NULL)
+ {
/* flow still not found, must be new */
- if (workflow->cur_active_flows == workflow->max_active_flows) {
+ if (workflow->cur_active_flows == workflow->max_active_flows)
+ {
syslog(LOG_DAEMON | LOG_WARNING,
"[%8llu, %d] max flows to track reached: %llu, idle: %llu\n",
workflow->packets_captured,
@@ -1171,7 +1334,8 @@ static void ndpi_process_packet(uint8_t * const args,
}
flow_to_process = (struct nDPId_flow_info *)ndpi_malloc(sizeof(*flow_to_process));
- if (flow_to_process == NULL) {
+ if (flow_to_process == NULL)
+ {
syslog(LOG_DAEMON | LOG_WARNING,
"[%8llu, %d] Not enough memory for flow info\n",
workflow->packets_captured,
@@ -1185,7 +1349,8 @@ static void ndpi_process_packet(uint8_t * const args,
flow_to_process->flow_id = flow_id++;
flow_to_process->ndpi_flow = (struct ndpi_flow_struct *)ndpi_flow_malloc(SIZEOF_FLOW_STRUCT);
- if (flow_to_process->ndpi_flow == NULL) {
+ if (flow_to_process->ndpi_flow == NULL)
+ {
syslog(LOG_DAEMON | LOG_WARNING,
"[%8llu, %d, %4u] Not enough memory for flow struct\n",
workflow->packets_captured,
@@ -1196,7 +1361,8 @@ static void ndpi_process_packet(uint8_t * const args,
memset(flow_to_process->ndpi_flow, 0, SIZEOF_FLOW_STRUCT);
flow_to_process->ndpi_src = (struct ndpi_id_struct *)ndpi_calloc(1, SIZEOF_ID_STRUCT);
- if (flow_to_process->ndpi_src == NULL) {
+ if (flow_to_process->ndpi_src == NULL)
+ {
syslog(LOG_DAEMON | LOG_WARNING,
"[%8llu, %d, %4u] Not enough memory for src id struct\n",
workflow->packets_captured,
@@ -1206,7 +1372,8 @@ static void ndpi_process_packet(uint8_t * const args,
}
flow_to_process->ndpi_dst = (struct ndpi_id_struct *)ndpi_calloc(1, SIZEOF_ID_STRUCT);
- if (flow_to_process->ndpi_dst == NULL) {
+ if (flow_to_process->ndpi_dst == NULL)
+ {
syslog(LOG_DAEMON | LOG_WARNING,
"[%8llu, %d, %4u] Not enough memory for dst id struct\n",
workflow->packets_captured,
@@ -1221,7 +1388,8 @@ static void ndpi_process_packet(uint8_t * const args,
flow_to_process->flow_id,
(flow_to_process->is_midstream_flow != 0 ? "midstream-" : ""));
#endif
- if (ndpi_tsearch(flow_to_process, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp) == NULL) {
+ if (ndpi_tsearch(flow_to_process, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp) == NULL)
+ {
/* Possible Leak, but should not happen as we'd abort earlier. */
return;
}
@@ -1232,13 +1400,18 @@ static void ndpi_process_packet(uint8_t * const args,
#ifndef DISABLE_JSONIZER
jsonize_flow_event(reader_thread, flow_to_process, FLOW_NEW);
#endif
- } else {
+ }
+ else
+ {
flow_to_process = *(struct nDPId_flow_info **)tree_result;
- if (direction_changed != 0) {
+ if (direction_changed != 0)
+ {
ndpi_src = flow_to_process->ndpi_dst;
ndpi_dst = flow_to_process->ndpi_src;
- } else {
+ }
+ else
+ {
ndpi_src = flow_to_process->ndpi_src;
ndpi_dst = flow_to_process->ndpi_dst;
}
@@ -1247,7 +1420,8 @@ static void ndpi_process_packet(uint8_t * const args,
flow_to_process->packets_processed++;
flow_to_process->total_l4_data_len += l4_len;
/* update timestamps, important for timeout handling */
- if (flow_to_process->first_seen == 0) {
+ if (flow_to_process->first_seen == 0)
+ {
flow_to_process->first_seen = time_ms;
}
flow_to_process->last_seen = time_ms;
@@ -1255,7 +1429,8 @@ static void ndpi_process_packet(uint8_t * const args,
flow_to_process->flow_ack_seen = flow.flow_ack_seen;
/* TCP-FIN: indicates that at least one side wants to end the connection */
- if (flow.flow_fin_ack_seen != 0 && flow_to_process->flow_fin_ack_seen == 0) {
+ if (flow.flow_fin_ack_seen != 0 && flow_to_process->flow_fin_ack_seen == 0)
+ {
flow_to_process->flow_fin_ack_seen = 1;
#ifdef DISABLE_JSONIZER
printf("[%8llu, %d, %4u] end of flow\n", workflow->packets_captured, thread_index, flow_to_process->flow_id);
@@ -1265,17 +1440,23 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
- if (l4_len > flow_to_process->max_l4_data_len) {
+ if (l4_len > flow_to_process->max_l4_data_len)
+ {
flow_to_process->max_l4_data_len = l4_len;
}
- if (l4_len < flow_to_process->min_l4_data_len) {
+ if (l4_len < flow_to_process->min_l4_data_len)
+ {
flow_to_process->min_l4_data_len = l4_len;
}
- if (flow_to_process->ndpi_flow->num_processed_pkts == 0xFF) {
+ if (flow_to_process->ndpi_flow->num_processed_pkts == 0xFF)
+ {
return;
- } else if (flow_to_process->ndpi_flow->num_processed_pkts == 0xFE) {
- if (flow_to_process->detection_completed != 0) {
+ }
+ else if (flow_to_process->ndpi_flow->num_processed_pkts == 0xFE)
+ {
+ if (flow_to_process->detection_completed != 0)
+ {
#ifdef DISABLE_JSONIZER
printf("[%8llu, %d, %4d][DETECTED] protocol: %s | app protocol: %s | category: %s\n",
workflow->packets_captured,
@@ -1287,12 +1468,15 @@ static void ndpi_process_packet(uint8_t * const args,
#else
jsonize_flow_event(reader_thread, flow_to_process, FLOW_DETECTED);
#endif
- } else {
+ }
+ else
+ {
/* last chance to guess something, better then nothing */
uint8_t protocol_was_guessed = 0;
flow_to_process->guessed_protocol =
ndpi_detection_giveup(workflow->ndpi_struct, flow_to_process->ndpi_flow, 1, &protocol_was_guessed);
- if (protocol_was_guessed != 0) {
+ if (protocol_was_guessed != 0)
+ {
#ifdef DISABLE_JSONIZER
printf("[%8llu, %d, %4d][GUESSED] protocol: %s | app protocol: %s | category: %s\n",
workflow->packets_captured,
@@ -1304,7 +1488,9 @@ static void ndpi_process_packet(uint8_t * const args,
#else
jsonize_flow_event(reader_thread, flow_to_process, FLOW_GUESSED);
#endif
- } else {
+ }
+ else
+ {
#ifdef DISABLE_JSONIZER
printf("[%8llu, %d, %4d][FLOW NOT DETECTED]\n",
workflow->packets_captured,
@@ -1326,9 +1512,11 @@ static void ndpi_process_packet(uint8_t * const args,
ndpi_dst);
if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_to_process->detected_l7_protocol) != 0 &&
- flow_to_process->detection_completed == 0) {
+ flow_to_process->detection_completed == 0)
+ {
if (flow_to_process->detected_l7_protocol.master_protocol != NDPI_PROTOCOL_UNKNOWN ||
- flow_to_process->detected_l7_protocol.app_protocol != NDPI_PROTOCOL_UNKNOWN) {
+ flow_to_process->detected_l7_protocol.app_protocol != NDPI_PROTOCOL_UNKNOWN)
+ {
flow_to_process->detection_completed = 1;
workflow->detected_flow_protocols++;
#ifdef DISABLE_JSONIZER
@@ -1348,13 +1536,16 @@ static void ndpi_process_packet(uint8_t * const args,
static void run_pcap_loop(struct nDPId_reader_thread const * const reader_thread)
{
- if (reader_thread->workflow != NULL && reader_thread->workflow->pcap_handle != NULL) {
+ if (reader_thread->workflow != NULL && reader_thread->workflow->pcap_handle != NULL)
+ {
if (pcap_loop(reader_thread->workflow->pcap_handle, -1, &ndpi_process_packet, (uint8_t *)reader_thread) ==
- PCAP_ERROR) {
+ PCAP_ERROR)
+ {
syslog(LOG_DAEMON | LOG_ERR,
- "Error while reading pcap file: '%s'\n", pcap_geterr(reader_thread->workflow->pcap_handle));
+ "Error while reading pcap file: '%s'\n",
+ pcap_geterr(reader_thread->workflow->pcap_handle));
reader_thread->workflow->error_or_eof = 1;
}
}
@@ -1362,7 +1553,8 @@ static void run_pcap_loop(struct nDPId_reader_thread const * const reader_thread
static void break_pcap_loop(struct nDPId_reader_thread * const reader_thread)
{
- if (reader_thread->workflow != NULL && reader_thread->workflow->pcap_handle != NULL) {
+ if (reader_thread->workflow != NULL && reader_thread->workflow->pcap_handle != NULL)
+ {
pcap_breakloop(reader_thread->workflow->pcap_handle);
}
}
@@ -1372,7 +1564,8 @@ static void * processing_thread(void * const ndpi_thread_arg)
struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)ndpi_thread_arg;
#ifndef DISABLE_JSONIZER
- if (connect_to_json_socket(reader_thread) != 0) {
+ if (connect_to_json_socket(reader_thread) != 0)
+ {
syslog(LOG_DAEMON | LOG_ERR,
"Thread %u: Could not connect to JSON sink, will try again later",
reader_thread->array_index);
@@ -1385,8 +1578,10 @@ static void * processing_thread(void * const ndpi_thread_arg)
static int processing_threads_error_or_eof(void)
{
- for (int i = 0; i < reader_thread_count; ++i) {
- if (reader_threads[i].workflow->error_or_eof == 0) {
+ for (int i = 0; i < reader_thread_count; ++i)
+ {
+ if (reader_threads[i].workflow->error_or_eof == 0)
+ {
return 0;
}
}
@@ -1400,26 +1595,31 @@ static int start_reader_threads(void)
sigfillset(&thread_signal_set);
sigdelset(&thread_signal_set, SIGINT);
sigdelset(&thread_signal_set, SIGTERM);
- if (pthread_sigmask(SIG_BLOCK, &thread_signal_set, &old_signal_set) != 0) {
+ if (pthread_sigmask(SIG_BLOCK, &thread_signal_set, &old_signal_set) != 0)
+ {
fprintf(stderr, "pthread_sigmask: %s\n", strerror(errno));
return 1;
}
- for (int i = 0; i < reader_thread_count; ++i) {
+ for (int i = 0; i < reader_thread_count; ++i)
+ {
reader_threads[i].array_index = i;
- if (reader_threads[i].workflow == NULL) {
+ if (reader_threads[i].workflow == NULL)
+ {
/* no more threads should be started */
break;
}
- if (pthread_create(&reader_threads[i].thread_id, NULL, processing_thread, &reader_threads[i]) != 0) {
+ if (pthread_create(&reader_threads[i].thread_id, NULL, processing_thread, &reader_threads[i]) != 0)
+ {
fprintf(stderr, "pthread_create: %s\n", strerror(errno));
return 1;
}
}
- if (pthread_sigmask(SIG_BLOCK, &old_signal_set, NULL) != 0) {
+ if (pthread_sigmask(SIG_BLOCK, &old_signal_set, NULL) != 0)
+ {
fprintf(stderr, "pthread_sigmask: %s\n", strerror(errno));
return 1;
}
@@ -1435,14 +1635,17 @@ static int stop_reader_threads(void)
unsigned long long int total_flows_idle = 0;
unsigned long long int total_flows_detected = 0;
- for (int i = 0; i < reader_thread_count; ++i) {
+ for (int i = 0; i < reader_thread_count; ++i)
+ {
break_pcap_loop(&reader_threads[i]);
}
printf("------------------------------------ Stopping reader threads\n");
- for (int i = 0; i < reader_thread_count; ++i) {
- if (reader_threads[i].workflow == NULL) {
+ for (int i = 0; i < reader_thread_count; ++i)
+ {
+ if (reader_threads[i].workflow == NULL)
+ {
continue;
}
@@ -1470,12 +1673,15 @@ static int stop_reader_threads(void)
printf("Total flows timed out..: %llu\n", total_flows_idle);
printf("Total flows detected...: %llu\n", total_flows_detected);
- for (int i = 0; i < reader_thread_count; ++i) {
- if (reader_threads[i].workflow == NULL) {
+ for (int i = 0; i < reader_thread_count; ++i)
+ {
+ if (reader_threads[i].workflow == NULL)
+ {
continue;
}
- if (pthread_join(reader_threads[i].thread_id, NULL) != 0) {
+ if (pthread_join(reader_threads[i].thread_id, NULL) != 0)
+ {
syslog(LOG_DAEMON | LOG_ERR, "pthread_join: %s\n", strerror(errno));
}
@@ -1489,13 +1695,17 @@ static void sighandler(int signum)
{
syslog(LOG_DAEMON | LOG_NOTICE, "Received SIGNAL %d\n", signum);
- if (main_thread_shutdown == 0) {
+ if (main_thread_shutdown == 0)
+ {
main_thread_shutdown = 1;
- if (stop_reader_threads() != 0) {
+ if (stop_reader_threads() != 0)
+ {
syslog(LOG_DAEMON | LOG_ERR, "Failed to stop reader threads!\n");
exit(EXIT_FAILURE);
}
- } else {
+ }
+ else
+ {
syslog(LOG_DAEMON | LOG_NOTICE, "Reader threads are already shutting down, please be patient.\n");
}
}
@@ -1504,8 +1714,10 @@ static int parse_options(int argc, char ** argv)
{
int opt;
- while ((opt = getopt(argc, argv, "hi:lc:")) != -1) {
- switch (opt) {
+ while ((opt = getopt(argc, argv, "hi:lc:")) != -1)
+ {
+ switch (opt)
+ {
case 'i':
pcap_file_or_interface = strdup(optarg);
break;
@@ -1531,11 +1743,13 @@ static int parse_options(int argc, char ** argv)
int main(int argc, char ** argv)
{
- if (argc == 0) {
+ if (argc == 0)
+ {
return 1;
}
- if (parse_options(argc, argv) != 0) {
+ if (parse_options(argc, argv) != 0)
+ {
return 1;
}
@@ -1551,23 +1765,27 @@ int main(int argc, char ** argv)
openlog("nDPId", LOG_CONS | (log_to_stderr != 0 ? LOG_PERROR : 0), LOG_DAEMON);
- if (setup_reader_threads(pcap_file_or_interface) != 0) {
+ if (setup_reader_threads(pcap_file_or_interface) != 0)
+ {
fprintf(stderr, "%s: setup_reader_threads failed\n", argv[0]);
return 1;
}
- if (start_reader_threads() != 0) {
+ if (start_reader_threads() != 0)
+ {
fprintf(stderr, "%s: start_reader_threads\n", argv[0]);
return 1;
}
signal(SIGINT, sighandler);
signal(SIGTERM, sighandler);
- while (main_thread_shutdown == 0 && processing_threads_error_or_eof() == 0) {
+ while (main_thread_shutdown == 0 && processing_threads_error_or_eof() == 0)
+ {
sleep(1);
}
- if (main_thread_shutdown == 0 && stop_reader_threads() != 0) {
+ if (main_thread_shutdown == 0 && stop_reader_threads() != 0)
+ {
fprintf(stderr, "%s: stop_reader_threads\n", argv[0]);
return 1;
}