aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--example/ndpiReader.c92
-rw-r--r--src/include/ndpi_util.h33
-rw-r--r--src/lib/ndpi_util.c123
3 files changed, 147 insertions, 101 deletions
diff --git a/example/ndpiReader.c b/example/ndpiReader.c
index d86fb3f2b..0a1747088 100644
--- a/example/ndpiReader.c
+++ b/example/ndpiReader.c
@@ -109,6 +109,10 @@ static u_int32_t num_flows;
struct reader_thread {
struct ndpi_workflow * workflow;
pthread_t pthread;
+ u_int64_t last_idle_scan_time;
+ u_int32_t idle_scan_idx;
+ u_int32_t num_idle_flows;
+ struct ndpi_flow_info *idle_flows[IDLE_SCAN_BUDGET];
};
static struct reader_thread ndpi_thread_info[MAX_NUM_READER_THREADS];
@@ -553,7 +557,7 @@ static void node_idle_scan_walker(const void *node, ndpi_VISIT which, int depth,
struct ndpi_flow_info *flow = *(struct ndpi_flow_info **) node;
u_int16_t thread_id = *((u_int16_t *) user_data);
- if(ndpi_thread_info[thread_id].workflow->num_idle_flows == IDLE_SCAN_BUDGET) /* TODO optimise with a budget-based walk */
+ if(ndpi_thread_info[thread_id].num_idle_flows == IDLE_SCAN_BUDGET) /* TODO optimise with a budget-based walk */
return;
if((which == ndpi_preorder) || (which == ndpi_leaf)) { /* Avoid walking the same node multiple times */
@@ -569,13 +573,69 @@ static void node_idle_scan_walker(const void *node, ndpi_VISIT which, int depth,
ndpi_thread_info[thread_id].workflow->stats.ndpi_flow_count--;
/* adding to a queue (we can't delete it from the tree inline ) */
- ndpi_thread_info[thread_id].workflow->idle_flows[ndpi_thread_info[thread_id].workflow->num_idle_flows++] = flow;
+ ndpi_thread_info[thread_id].idle_flows[ndpi_thread_info[thread_id].num_idle_flows++] = flow;
}
}
}
/* ***************************************************** */
+static void on_protocol_discovered(struct ndpi_workflow * workflow,
+ struct ndpi_flow_info * flow,
+ void * udata) {
+ const u_int16_t thread_id = *((u_int16_t *)udata);
+
+ if(verbose > 1) {
+ if(enable_protocol_guess) {
+ if(flow->detected_protocol.protocol == NDPI_PROTOCOL_UNKNOWN) {
+ flow->detected_protocol.protocol = node_guess_undetected_protocol(thread_id, flow),
+ flow->detected_protocol.master_protocol = NDPI_PROTOCOL_UNKNOWN;
+ }
+ }
+
+ printFlow(thread_id, flow);
+ }
+}
+
+/* ***************************************************** */
+
+static void debug_printf(u_int32_t protocol, void *id_struct,
+ ndpi_log_level_t log_level,
+ const char *format, ...) {
+ va_list va_ap;
+#ifndef WIN32
+ struct tm result;
+#endif
+
+ if(log_level <= nDPI_traceLevel) {
+ char buf[8192], out_buf[8192];
+ char theDate[32];
+ const char *extra_msg = "";
+ time_t theTime = time(NULL);
+
+ va_start (va_ap, format);
+
+ if(log_level == NDPI_LOG_ERROR)
+ extra_msg = "ERROR: ";
+ else if(log_level == NDPI_LOG_TRACE)
+ extra_msg = "TRACE: ";
+ else
+ extra_msg = "DEBUG: ";
+
+ memset(buf, 0, sizeof(buf));
+ strftime(theDate, 32, "%d/%b/%Y %H:%M:%S", localtime_r(&theTime,&result) );
+ vsnprintf(buf, sizeof(buf)-1, format, va_ap);
+
+ snprintf(out_buf, sizeof(out_buf), "%s %s%s", theDate, extra_msg, buf);
+ printf("%s", out_buf);
+ fflush(stdout);
+ }
+
+ va_end(va_ap);
+}
+
+/* ***************************************************** */
+
static void setupDetection(u_int16_t thread_id, pcap_t * pcap_handle) {
NDPI_PROTOCOL_BITMASK all;
@@ -588,8 +648,10 @@ static void setupDetection(u_int16_t thread_id, pcap_t * pcap_handle) {
prefs.detection_tick_resolution = detection_tick_resolution;
memset(&ndpi_thread_info[thread_id], 0, sizeof(ndpi_thread_info[thread_id]));
- ndpi_thread_info[thread_id].workflow = ndpi_workflow_init(&prefs, pcap_handle, malloc_wrapper, free_wrapper);
+ ndpi_thread_info[thread_id].workflow = ndpi_workflow_init(&prefs, pcap_handle, malloc_wrapper, free_wrapper, debug_printf);
/* ndpi_thread_info[thread_id].workflow->ndpi_struct->http_dont_dissect_response = 1; */
+
+ ndpi_workflow_set_flow_detected_callback(ndpi_thread_info[thread_id].workflow, on_protocol_discovered, (void *)&thread_id);
// enable all protocols
NDPI_BITMASK_SET_ALL(all);
@@ -1078,6 +1140,30 @@ static void pcap_packet_callback_checked(u_char *args,
pcap_end.tv_sec = header->ts.tv_sec, pcap_end.tv_usec = header->ts.tv_usec;
}
+ /* Idle flows cleanup */
+ if(live_capture) {
+ if(ndpi_thread_info[thread_id].last_idle_scan_time + IDLE_SCAN_PERIOD < ndpi_thread_info[thread_id].workflow->last_time) {
+ /* scan for idle flows */
+ ndpi_twalk(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[ndpi_thread_info[thread_id].idle_scan_idx], node_idle_scan_walker, &thread_id);
+
+ /* remove idle flows (unfortunately we cannot do this inline) */
+ while (ndpi_thread_info[thread_id].num_idle_flows > 0) {
+
+ /* search and delete the idle flow from the "ndpi_flow_root" (see struct reader thread) - here flows are the node of a b-tree */
+ ndpi_tdelete(ndpi_thread_info[thread_id].idle_flows[--ndpi_thread_info[thread_id].num_idle_flows],
+ &ndpi_thread_info[thread_id].workflow->ndpi_flows_root[ndpi_thread_info[thread_id].idle_scan_idx],
+ ndpi_workflow_node_cmp);
+
+ /* free the memory associated to idle flow in "idle_flows" - (see struct reader thread)*/
+ free_ndpi_flow_info(ndpi_thread_info[thread_id].idle_flows[ndpi_thread_info[thread_id].num_idle_flows]);
+ ndpi_free(ndpi_thread_info[thread_id].idle_flows[ndpi_thread_info[thread_id].num_idle_flows]);
+ }
+
+ if(++ndpi_thread_info[thread_id].idle_scan_idx == ndpi_thread_info[thread_id].workflow->prefs.num_roots) ndpi_thread_info[thread_id].idle_scan_idx = 0;
+ ndpi_thread_info[thread_id].last_idle_scan_time = ndpi_thread_info[thread_id].workflow->last_time;
+ }
+ }
+
/* check for buffer changes */
if(memcmp(packet, packet_checked, header->caplen) != 0)
printf("INTERNAL ERROR: ingress packet was nodified by nDPI: this should not happen [thread_id=%u, packetId=%lu]\n",
diff --git a/src/include/ndpi_util.h b/src/include/ndpi_util.h
index c7d1e6745..daa564b69 100644
--- a/src/include/ndpi_util.h
+++ b/src/include/ndpi_util.h
@@ -87,20 +87,25 @@ typedef struct ndpi_workflow_prefs {
u_int32_t detection_tick_resolution;
} ndpi_workflow_prefs_t;
+struct ndpi_workflow;
+/** workflow, flow, user data */
+typedef void (*ndpi_workflow_callback_ptr) (struct ndpi_workflow *, struct ndpi_flow_info *, void *);
+
typedef struct ndpi_workflow {
u_int64_t last_time;
- u_int64_t last_idle_scan_time;
- u_int32_t idle_scan_idx;
- u_int32_t num_idle_flows; /* TODO_EMA decide if idle flows will be handled */
struct ndpi_workflow_prefs prefs;
struct ndpi_stats stats;
+ ndpi_workflow_callback_ptr __flow_detected_callback;
+ void * __flow_detected_udata;
+ ndpi_workflow_callback_ptr __flow_giveup_callback;
+ void * __flow_giveup_udata;
+
/* outside referencies */
pcap_t *pcap_handle;
/* allocated by prefs */
- struct ndpi_flow_info **idle_flows;
void **ndpi_flows_root;
struct ndpi_detection_module_struct *ndpi_struct;
} ndpi_workflow_t;
@@ -109,7 +114,8 @@ typedef struct ndpi_workflow {
struct ndpi_workflow * ndpi_workflow_init(const struct ndpi_workflow_prefs * prefs,
pcap_t * pcap_handle,
void * (*malloc_wrapper)(size_t),
- void (*free_wrapper)(void*));
+ void (*free_wrapper)(void*),
+ ndpi_debug_function_ptr ndpi_debug_printf);
void ndpi_workflow_free(struct ndpi_workflow * workflow);
@@ -117,5 +123,22 @@ void ndpi_workflow_free(struct ndpi_workflow * workflow);
void ndpi_workflow_process_packet (struct ndpi_workflow * workflow,
const struct pcap_pkthdr *header,
const u_char *packet);
+
+/* flow callbacks: ndpi_flow_info will be freed right after */
+inline void ndpi_workflow_set_flow_detected_callback(struct ndpi_workflow * workflow,
+ ndpi_workflow_callback_ptr callback,
+ void * udata) {
+ workflow->__flow_detected_callback = callback;
+ workflow->__flow_detected_udata = udata;
+}
+
+inline void ndpi_workflow_set_flow_giveup_callback(struct ndpi_workflow * workflow,
+ ndpi_workflow_callback_ptr callback,
+ void * udata) {
+ workflow->__flow_giveup_callback = callback;
+ workflow->__flow_giveup_udata = udata;
+}
+
+int ndpi_workflow_node_cmp(const void *a, const void *b);
#endif
diff --git a/src/lib/ndpi_util.c b/src/lib/ndpi_util.c
index 5899c81a1..51d6757e1 100644
--- a/src/lib/ndpi_util.c
+++ b/src/lib/ndpi_util.c
@@ -69,10 +69,6 @@
#include "ndpi_main.h"
#include "ndpi_util.h"
-/* TODO_EMA printf replace with API */
-/* TODO_EMA */
-#define json_flag 0
-
/* ***************************************************** */
static void free_ndpi_flow_info(struct ndpi_flow_info *flow) {
@@ -86,41 +82,6 @@ static void free_ndpi_flow_info(struct ndpi_flow_info *flow) {
static const u_int8_t nDPI_traceLevel = 0;
-static void debug_printf(u_int32_t protocol, void *id_struct,
- ndpi_log_level_t log_level,
- const char *format, ...) {
- va_list va_ap;
-#ifndef WIN32
- struct tm result;
-#endif
-
- if(log_level <= nDPI_traceLevel) {
- char buf[8192], out_buf[8192];
- char theDate[32];
- const char *extra_msg = "";
- time_t theTime = time(NULL);
-
- va_start (va_ap, format);
-
- if(log_level == NDPI_LOG_ERROR)
- extra_msg = "ERROR: ";
- else if(log_level == NDPI_LOG_TRACE)
- extra_msg = "TRACE: ";
- else
- extra_msg = "DEBUG: ";
-
- memset(buf, 0, sizeof(buf));
- strftime(theDate, 32, "%d/%b/%Y %H:%M:%S", localtime_r(&theTime,&result) );
- vsnprintf(buf, sizeof(buf)-1, format, va_ap);
-
- snprintf(out_buf, sizeof(out_buf), "%s %s%s", theDate, extra_msg, buf);
- printf("%s", out_buf);
- fflush(stdout);
- }
-
- va_end(va_ap);
-}
-
/* ***************************************************** */
/* TODO remove in future... */
static void (*removeme_free_wrapper)(void*);
@@ -128,11 +89,12 @@ static void (*removeme_free_wrapper)(void*);
struct ndpi_workflow * ndpi_workflow_init(const struct ndpi_workflow_prefs * prefs,
pcap_t * pcap_handle,
void * (*malloc_wrapper)(size_t),
- void (*free_wrapper)(void*)) {
+ void (*free_wrapper)(void*),
+ ndpi_debug_function_ptr ndpi_debug_printf) {
/* TODO: just needed here to init ndpi malloc wrapper */
struct ndpi_detection_module_struct * module = ndpi_init_detection_module(
- prefs->detection_tick_resolution, malloc_wrapper, free_wrapper, debug_printf);
+ prefs->detection_tick_resolution, malloc_wrapper, free_wrapper, ndpi_debug_printf);
struct ndpi_workflow * workflow = ndpi_calloc(1, sizeof(struct ndpi_workflow));
@@ -143,11 +105,10 @@ struct ndpi_workflow * ndpi_workflow_init(const struct ndpi_workflow_prefs * pre
workflow->ndpi_struct = module;
if(workflow->ndpi_struct == NULL) {
- printf("ERROR: global structure initialization failed\n");
+ NDPI_LOG(0, NULL, NDPI_LOG_ERROR, "global structure initialization failed\n");
exit(-1);
}
- workflow->idle_flows = ndpi_calloc(workflow->num_idle_flows, sizeof(struct ndpi_flow_info *));
workflow->ndpi_flows_root = ndpi_calloc(workflow->prefs.num_roots, sizeof(void *));
return workflow;
}
@@ -155,13 +116,12 @@ struct ndpi_workflow * ndpi_workflow_init(const struct ndpi_workflow_prefs * pre
void ndpi_workflow_free(struct ndpi_workflow * workflow) {
ndpi_exit_detection_module(workflow->ndpi_struct, removeme_free_wrapper);
free(workflow->ndpi_flows_root);
- free(workflow->idle_flows);
free(workflow);
}
/* ***************************************************** */
-static int node_cmp(const void *a, const void *b) {
+int ndpi_workflow_node_cmp(const void *a, const void *b) {
struct ndpi_flow_info *fa = (struct ndpi_flow_info*)a;
struct ndpi_flow_info *fb = (struct ndpi_flow_info*)b;
@@ -318,21 +278,21 @@ static struct ndpi_flow_info *get_ndpi_flow_info(struct ndpi_workflow * workflow
flow.lower_port = lower_port, flow.upper_port = upper_port;
if(0)
- printf("[NDPI] [%u][%u:%u <-> %u:%u]\n",
+ NDPI_LOG(0, workflow.ndpi_struct, NDPI_LOG_DEBUG, "[NDPI] [%u][%u:%u <-> %u:%u]\n",
iph->protocol, lower_ip, ntohs(lower_port), upper_ip, ntohs(upper_port));
idx = (vlan_id + lower_ip + upper_ip + iph->protocol + lower_port + upper_port) % workflow->prefs.num_roots;
- ret = ndpi_tfind(&flow, &workflow->ndpi_flows_root[idx], node_cmp);
+ ret = ndpi_tfind(&flow, &workflow->ndpi_flows_root[idx], ndpi_workflow_node_cmp);
if(ret == NULL) {
if(workflow->stats.ndpi_flow_count == workflow->prefs.max_ndpi_flows) {
- printf("ERROR: maximum flow count (%u) has been exceeded\n", workflow->prefs.max_ndpi_flows);
+ NDPI_LOG(0, workflow.ndpi_struct, NDPI_LOG_ERROR, "maximum flow count (%u) has been exceeded\n", workflow->prefs.max_ndpi_flows);
exit(-1);
} else {
struct ndpi_flow_info *newflow = (struct ndpi_flow_info*)malloc(sizeof(struct ndpi_flow_info));
if(newflow == NULL) {
- printf("[NDPI] %s(1): not enough memory\n", __FUNCTION__);
+ NDPI_LOG(0, workflow.ndpi_struct, NDPI_LOG_ERROR, "[NDPI] %s(1): not enough memory\n", __FUNCTION__);
return(NULL);
}
@@ -351,27 +311,27 @@ static struct ndpi_flow_info *get_ndpi_flow_info(struct ndpi_workflow * workflow
}
if((newflow->ndpi_flow = ndpi_malloc(SIZEOF_FLOW_STRUCT)) == NULL) {
- printf("[NDPI] %s(2): not enough memory\n", __FUNCTION__);
+ NDPI_LOG(0, workflow.ndpi_struct, NDPI_LOG_ERROR, "[NDPI] %s(2): not enough memory\n", __FUNCTION__);
free(newflow);
return(NULL);
} else
memset(newflow->ndpi_flow, 0, SIZEOF_FLOW_STRUCT);
if((newflow->src_id = ndpi_malloc(SIZEOF_ID_STRUCT)) == NULL) {
- printf("[NDPI] %s(3): not enough memory\n", __FUNCTION__);
+ NDPI_LOG(0, workflow.ndpi_struct, NDPI_LOG_ERROR, "[NDPI] %s(3): not enough memory\n", __FUNCTION__);
free(newflow);
return(NULL);
} else
memset(newflow->src_id, 0, SIZEOF_ID_STRUCT);
if((newflow->dst_id = ndpi_malloc(SIZEOF_ID_STRUCT)) == NULL) {
- printf("[NDPI] %s(4): not enough memory\n", __FUNCTION__);
+ NDPI_LOG(0, workflow.ndpi_struct, NDPI_LOG_ERROR, "[NDPI] %s(4): not enough memory\n", __FUNCTION__);
free(newflow);
return(NULL);
} else
memset(newflow->dst_id, 0, SIZEOF_ID_STRUCT);
- ndpi_tsearch(newflow, &workflow->ndpi_flows_root[idx], node_cmp); /* Add */
+ ndpi_tsearch(newflow, &workflow->ndpi_flows_root[idx], ndpi_workflow_node_cmp); /* Add */
workflow->stats.ndpi_flow_count++;
*src = newflow->src_id, *dst = newflow->dst_id;
@@ -470,6 +430,7 @@ static unsigned int packet_processing(struct ndpi_workflow * workflow,
return(0);
}
+ /* Protocol already detected */
if(flow->detection_completed) return(0);
flow->detected_protocol = ndpi_detection_process_packet(workflow->ndpi_struct, ndpi_flow,
@@ -479,6 +440,7 @@ static unsigned int packet_processing(struct ndpi_workflow * workflow,
if((flow->detected_protocol.protocol != NDPI_PROTOCOL_UNKNOWN)
|| ((proto == IPPROTO_UDP) && (flow->packets > 8))
|| ((proto == IPPROTO_TCP) && (flow->packets > 10))) {
+ /* New protocol detected or give up */
flow->detection_completed = 1;
if((flow->detected_protocol.protocol == NDPI_PROTOCOL_UNKNOWN) && (ndpi_flow->num_stun_udp_pkts > 0))
@@ -502,46 +464,19 @@ static unsigned int packet_processing(struct ndpi_workflow * workflow,
snprintf(flow->ssl.server_certificate, sizeof(flow->ssl.server_certificate), "%s", flow->ndpi_flow->protos.ssl.server_certificate);
}
- if(flow->detected_protocol.protocol == NDPI_PROTOCOL_UNKNOWN)
+ if(flow->detected_protocol.protocol == NDPI_PROTOCOL_UNKNOWN) {
flow->detected_protocol = ndpi_detection_giveup(workflow->ndpi_struct, flow->ndpi_flow);
-
+
+ if (workflow->__flow_giveup_callback != NULL)
+ workflow->__flow_giveup_callback(workflow, flow, workflow->__flow_giveup_udata);
+ } else {
+ if (workflow->__flow_detected_callback != NULL)
+ workflow->__flow_detected_callback(workflow, flow, workflow->__flow_detected_udata);
+ }
+
free_ndpi_flow_info(flow);
-
- /* TODO_EMA */
- /*if(verbose > 1) {
- if(enable_protocol_guess) {
- if(flow->detected_protocol.protocol == NDPI_PROTOCOL_UNKNOWN) {
- flow->detected_protocol.protocol = node_guess_undetected_protocol(thread_id, flow),
- flow->detected_protocol.master_protocol = NDPI_PROTOCOL_UNKNOWN;
- }
- }
-
- printFlow(thread_id, flow);
- }*/
}
- /* TODO_EMA use a callback */
- //~ if(live_capture) {
- //~ if(ndpi_thread_info[thread_id].last_idle_scan_time + IDLE_SCAN_PERIOD < ndpi_thread_info[thread_id].last_time) {
- //~ /* scan for idle flows */
- //~ ndpi_twalk(workflow->ndpi_flows_root[ndpi_thread_info[thread_id].idle_scan_idx], node_idle_scan_walker, &thread_id);
-
- //~ /* remove idle flows (unfortunately we cannot do this inline) */
- //~ while (ndpi_thread_info[thread_id].num_idle_flows > 0) {
-
- //~ /* search and delete the idle flow from the "ndpi_flow_root" (see struct reader thread) - here flows are the node of a b-tree */
- //~ ndpi_tdelete(ndpi_thread_info[thread_id].idle_flows[--ndpi_thread_info[thread_id].num_idle_flows], &workflow->ndpi_flows_root[ndpi_thread_info[thread_id].idle_scan_idx], node_cmp);
-
- //~ /* free the memory associated to idle flow in "idle_flows" - (see struct reader thread)*/
- //~ free_ndpi_flow_info(ndpi_thread_info[thread_id].idle_flows[ndpi_thread_info[thread_id].num_idle_flows]);
- //~ ndpi_free(ndpi_thread_info[thread_id].idle_flows[ndpi_thread_info[thread_id].num_idle_flows]);
- //~ }
-
- //~ if(++ndpi_thread_info[thread_id].idle_scan_idx == workflow->prefs.num_roots) ndpi_thread_info[thread_id].idle_scan_idx = 0;
- //~ ndpi_thread_info[thread_id].last_idle_scan_time = ndpi_thread_info[thread_id].last_time;
- //~ }
- //~ }
-
return 0;
}
@@ -763,7 +698,8 @@ void ndpi_workflow_process_packet (struct ndpi_workflow * workflow,
static u_int8_t cap_warning_used = 0;
if(cap_warning_used == 0) {
- if((!json_flag) && (!workflow->prefs.quiet_mode)) printf("\n\nWARNING: packet capture size is smaller than packet size, DETECTION MIGHT NOT WORK CORRECTLY\n\n");
+ if(!workflow->prefs.quiet_mode)
+ NDPI_LOG(0, workflow.ndpi_struct, NDPI_LOG_DEBUG, "\n\nWARNING: packet capture size is smaller than packet size, DETECTION MIGHT NOT WORK CORRECTLY\n\n");
cap_warning_used = 1;
}
}
@@ -783,7 +719,8 @@ void ndpi_workflow_process_packet (struct ndpi_workflow * workflow,
workflow->stats.fragmented_count++;
if(ipv4_frags_warning_used == 0) {
- if((!json_flag) && (!workflow->prefs.quiet_mode)) printf("\n\nWARNING: IPv4 fragments are not handled by this demo (nDPI supports them)\n");
+ if(!workflow->prefs.quiet_mode)
+ NDPI_LOG(0, workflow.ndpi_struct, NDPI_LOG_DEBUG, "\n\nWARNING: IPv4 fragments are not handled by this demo (nDPI supports them)\n");
ipv4_frags_warning_used = 1;
}
@@ -808,8 +745,8 @@ void ndpi_workflow_process_packet (struct ndpi_workflow * workflow,
v4_warning:
if(ipv4_warning_used == 0) {
- if((!json_flag) && (!workflow->prefs.quiet_mode))
- printf("\n\nWARNING: only IPv4/IPv6 packets are supported in this demo (nDPI supports both IPv4 and IPv6), all other packets will be discarded\n\n");
+ if(!workflow->prefs.quiet_mode)
+ NDPI_LOG(0, workflow.ndpi_struct, NDPI_LOG_DEBUG, "\n\nWARNING: only IPv4/IPv6 packets are supported in this demo (nDPI supports both IPv4 and IPv6), all other packets will be discarded\n\n");
ipv4_warning_used = 1;
}
workflow->stats.total_discarded_bytes += header->len;