diff options
-rw-r--r-- | example/ndpiReader.c | 92 | ||||
-rw-r--r-- | src/include/ndpi_util.h | 33 | ||||
-rw-r--r-- | src/lib/ndpi_util.c | 123 |
3 files changed, 147 insertions, 101 deletions
diff --git a/example/ndpiReader.c b/example/ndpiReader.c index d86fb3f2b..0a1747088 100644 --- a/example/ndpiReader.c +++ b/example/ndpiReader.c @@ -109,6 +109,10 @@ static u_int32_t num_flows; struct reader_thread { struct ndpi_workflow * workflow; pthread_t pthread; + u_int64_t last_idle_scan_time; + u_int32_t idle_scan_idx; + u_int32_t num_idle_flows; + struct ndpi_flow_info *idle_flows[IDLE_SCAN_BUDGET]; }; static struct reader_thread ndpi_thread_info[MAX_NUM_READER_THREADS]; @@ -553,7 +557,7 @@ static void node_idle_scan_walker(const void *node, ndpi_VISIT which, int depth, struct ndpi_flow_info *flow = *(struct ndpi_flow_info **) node; u_int16_t thread_id = *((u_int16_t *) user_data); - if(ndpi_thread_info[thread_id].workflow->num_idle_flows == IDLE_SCAN_BUDGET) /* TODO optimise with a budget-based walk */ + if(ndpi_thread_info[thread_id].num_idle_flows == IDLE_SCAN_BUDGET) /* TODO optimise with a budget-based walk */ return; if((which == ndpi_preorder) || (which == ndpi_leaf)) { /* Avoid walking the same node multiple times */ @@ -569,13 +573,69 @@ static void node_idle_scan_walker(const void *node, ndpi_VISIT which, int depth, ndpi_thread_info[thread_id].workflow->stats.ndpi_flow_count--; /* adding to a queue (we can't delete it from the tree inline ) */ - ndpi_thread_info[thread_id].workflow->idle_flows[ndpi_thread_info[thread_id].workflow->num_idle_flows++] = flow; + ndpi_thread_info[thread_id].idle_flows[ndpi_thread_info[thread_id].num_idle_flows++] = flow; } } } /* ***************************************************** */ +static void on_protocol_discovered(struct ndpi_workflow * workflow, + struct ndpi_flow_info * flow, + void * udata) { + const u_int16_t thread_id = *((u_int16_t *)udata); + + if(verbose > 1) { + if(enable_protocol_guess) { + if(flow->detected_protocol.protocol == NDPI_PROTOCOL_UNKNOWN) { + flow->detected_protocol.protocol = node_guess_undetected_protocol(thread_id, flow), + flow->detected_protocol.master_protocol = NDPI_PROTOCOL_UNKNOWN; + } + } + + printFlow(thread_id, flow); + } +} + +/* ***************************************************** */ + +static void debug_printf(u_int32_t protocol, void *id_struct, + ndpi_log_level_t log_level, + const char *format, ...) { + va_list va_ap; +#ifndef WIN32 + struct tm result; +#endif + + if(log_level <= nDPI_traceLevel) { + char buf[8192], out_buf[8192]; + char theDate[32]; + const char *extra_msg = ""; + time_t theTime = time(NULL); + + va_start (va_ap, format); + + if(log_level == NDPI_LOG_ERROR) + extra_msg = "ERROR: "; + else if(log_level == NDPI_LOG_TRACE) + extra_msg = "TRACE: "; + else + extra_msg = "DEBUG: "; + + memset(buf, 0, sizeof(buf)); + strftime(theDate, 32, "%d/%b/%Y %H:%M:%S", localtime_r(&theTime,&result) ); + vsnprintf(buf, sizeof(buf)-1, format, va_ap); + + snprintf(out_buf, sizeof(out_buf), "%s %s%s", theDate, extra_msg, buf); + printf("%s", out_buf); + fflush(stdout); + } + + va_end(va_ap); +} + +/* ***************************************************** */ + static void setupDetection(u_int16_t thread_id, pcap_t * pcap_handle) { NDPI_PROTOCOL_BITMASK all; @@ -588,8 +648,10 @@ static void setupDetection(u_int16_t thread_id, pcap_t * pcap_handle) { prefs.detection_tick_resolution = detection_tick_resolution; memset(&ndpi_thread_info[thread_id], 0, sizeof(ndpi_thread_info[thread_id])); - ndpi_thread_info[thread_id].workflow = ndpi_workflow_init(&prefs, pcap_handle, malloc_wrapper, free_wrapper); + ndpi_thread_info[thread_id].workflow = ndpi_workflow_init(&prefs, pcap_handle, malloc_wrapper, free_wrapper, debug_printf); /* ndpi_thread_info[thread_id].workflow->ndpi_struct->http_dont_dissect_response = 1; */ + + ndpi_workflow_set_flow_detected_callback(ndpi_thread_info[thread_id].workflow, on_protocol_discovered, (void *)&thread_id); // enable all protocols NDPI_BITMASK_SET_ALL(all); @@ -1078,6 +1140,30 @@ static void pcap_packet_callback_checked(u_char *args, pcap_end.tv_sec = header->ts.tv_sec, pcap_end.tv_usec = header->ts.tv_usec; } + /* Idle flows cleanup */ + if(live_capture) { + if(ndpi_thread_info[thread_id].last_idle_scan_time + IDLE_SCAN_PERIOD < ndpi_thread_info[thread_id].workflow->last_time) { + /* scan for idle flows */ + ndpi_twalk(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[ndpi_thread_info[thread_id].idle_scan_idx], node_idle_scan_walker, &thread_id); + + /* remove idle flows (unfortunately we cannot do this inline) */ + while (ndpi_thread_info[thread_id].num_idle_flows > 0) { + + /* search and delete the idle flow from the "ndpi_flow_root" (see struct reader thread) - here flows are the node of a b-tree */ + ndpi_tdelete(ndpi_thread_info[thread_id].idle_flows[--ndpi_thread_info[thread_id].num_idle_flows], + &ndpi_thread_info[thread_id].workflow->ndpi_flows_root[ndpi_thread_info[thread_id].idle_scan_idx], + ndpi_workflow_node_cmp); + + /* free the memory associated to idle flow in "idle_flows" - (see struct reader thread)*/ + free_ndpi_flow_info(ndpi_thread_info[thread_id].idle_flows[ndpi_thread_info[thread_id].num_idle_flows]); + ndpi_free(ndpi_thread_info[thread_id].idle_flows[ndpi_thread_info[thread_id].num_idle_flows]); + } + + if(++ndpi_thread_info[thread_id].idle_scan_idx == ndpi_thread_info[thread_id].workflow->prefs.num_roots) ndpi_thread_info[thread_id].idle_scan_idx = 0; + ndpi_thread_info[thread_id].last_idle_scan_time = ndpi_thread_info[thread_id].workflow->last_time; + } + } + /* check for buffer changes */ if(memcmp(packet, packet_checked, header->caplen) != 0) printf("INTERNAL ERROR: ingress packet was nodified by nDPI: this should not happen [thread_id=%u, packetId=%lu]\n", diff --git a/src/include/ndpi_util.h b/src/include/ndpi_util.h index c7d1e6745..daa564b69 100644 --- a/src/include/ndpi_util.h +++ b/src/include/ndpi_util.h @@ -87,20 +87,25 @@ typedef struct ndpi_workflow_prefs { u_int32_t detection_tick_resolution; } ndpi_workflow_prefs_t; +struct ndpi_workflow; +/** workflow, flow, user data */ +typedef void (*ndpi_workflow_callback_ptr) (struct ndpi_workflow *, struct ndpi_flow_info *, void *); + typedef struct ndpi_workflow { u_int64_t last_time; - u_int64_t last_idle_scan_time; - u_int32_t idle_scan_idx; - u_int32_t num_idle_flows; /* TODO_EMA decide if idle flows will be handled */ struct ndpi_workflow_prefs prefs; struct ndpi_stats stats; + ndpi_workflow_callback_ptr __flow_detected_callback; + void * __flow_detected_udata; + ndpi_workflow_callback_ptr __flow_giveup_callback; + void * __flow_giveup_udata; + /* outside referencies */ pcap_t *pcap_handle; /* allocated by prefs */ - struct ndpi_flow_info **idle_flows; void **ndpi_flows_root; struct ndpi_detection_module_struct *ndpi_struct; } ndpi_workflow_t; @@ -109,7 +114,8 @@ typedef struct ndpi_workflow { struct ndpi_workflow * ndpi_workflow_init(const struct ndpi_workflow_prefs * prefs, pcap_t * pcap_handle, void * (*malloc_wrapper)(size_t), - void (*free_wrapper)(void*)); + void (*free_wrapper)(void*), + ndpi_debug_function_ptr ndpi_debug_printf); void ndpi_workflow_free(struct ndpi_workflow * workflow); @@ -117,5 +123,22 @@ void ndpi_workflow_free(struct ndpi_workflow * workflow); void ndpi_workflow_process_packet (struct ndpi_workflow * workflow, const struct pcap_pkthdr *header, const u_char *packet); + +/* flow callbacks: ndpi_flow_info will be freed right after */ +inline void ndpi_workflow_set_flow_detected_callback(struct ndpi_workflow * workflow, + ndpi_workflow_callback_ptr callback, + void * udata) { + workflow->__flow_detected_callback = callback; + workflow->__flow_detected_udata = udata; +} + +inline void ndpi_workflow_set_flow_giveup_callback(struct ndpi_workflow * workflow, + ndpi_workflow_callback_ptr callback, + void * udata) { + workflow->__flow_giveup_callback = callback; + workflow->__flow_giveup_udata = udata; +} + +int ndpi_workflow_node_cmp(const void *a, const void *b); #endif diff --git a/src/lib/ndpi_util.c b/src/lib/ndpi_util.c index 5899c81a1..51d6757e1 100644 --- a/src/lib/ndpi_util.c +++ b/src/lib/ndpi_util.c @@ -69,10 +69,6 @@ #include "ndpi_main.h" #include "ndpi_util.h" -/* TODO_EMA printf replace with API */ -/* TODO_EMA */ -#define json_flag 0 - /* ***************************************************** */ static void free_ndpi_flow_info(struct ndpi_flow_info *flow) { @@ -86,41 +82,6 @@ static void free_ndpi_flow_info(struct ndpi_flow_info *flow) { static const u_int8_t nDPI_traceLevel = 0; -static void debug_printf(u_int32_t protocol, void *id_struct, - ndpi_log_level_t log_level, - const char *format, ...) { - va_list va_ap; -#ifndef WIN32 - struct tm result; -#endif - - if(log_level <= nDPI_traceLevel) { - char buf[8192], out_buf[8192]; - char theDate[32]; - const char *extra_msg = ""; - time_t theTime = time(NULL); - - va_start (va_ap, format); - - if(log_level == NDPI_LOG_ERROR) - extra_msg = "ERROR: "; - else if(log_level == NDPI_LOG_TRACE) - extra_msg = "TRACE: "; - else - extra_msg = "DEBUG: "; - - memset(buf, 0, sizeof(buf)); - strftime(theDate, 32, "%d/%b/%Y %H:%M:%S", localtime_r(&theTime,&result) ); - vsnprintf(buf, sizeof(buf)-1, format, va_ap); - - snprintf(out_buf, sizeof(out_buf), "%s %s%s", theDate, extra_msg, buf); - printf("%s", out_buf); - fflush(stdout); - } - - va_end(va_ap); -} - /* ***************************************************** */ /* TODO remove in future... */ static void (*removeme_free_wrapper)(void*); @@ -128,11 +89,12 @@ static void (*removeme_free_wrapper)(void*); struct ndpi_workflow * ndpi_workflow_init(const struct ndpi_workflow_prefs * prefs, pcap_t * pcap_handle, void * (*malloc_wrapper)(size_t), - void (*free_wrapper)(void*)) { + void (*free_wrapper)(void*), + ndpi_debug_function_ptr ndpi_debug_printf) { /* TODO: just needed here to init ndpi malloc wrapper */ struct ndpi_detection_module_struct * module = ndpi_init_detection_module( - prefs->detection_tick_resolution, malloc_wrapper, free_wrapper, debug_printf); + prefs->detection_tick_resolution, malloc_wrapper, free_wrapper, ndpi_debug_printf); struct ndpi_workflow * workflow = ndpi_calloc(1, sizeof(struct ndpi_workflow)); @@ -143,11 +105,10 @@ struct ndpi_workflow * ndpi_workflow_init(const struct ndpi_workflow_prefs * pre workflow->ndpi_struct = module; if(workflow->ndpi_struct == NULL) { - printf("ERROR: global structure initialization failed\n"); + NDPI_LOG(0, NULL, NDPI_LOG_ERROR, "global structure initialization failed\n"); exit(-1); } - workflow->idle_flows = ndpi_calloc(workflow->num_idle_flows, sizeof(struct ndpi_flow_info *)); workflow->ndpi_flows_root = ndpi_calloc(workflow->prefs.num_roots, sizeof(void *)); return workflow; } @@ -155,13 +116,12 @@ struct ndpi_workflow * ndpi_workflow_init(const struct ndpi_workflow_prefs * pre void ndpi_workflow_free(struct ndpi_workflow * workflow) { ndpi_exit_detection_module(workflow->ndpi_struct, removeme_free_wrapper); free(workflow->ndpi_flows_root); - free(workflow->idle_flows); free(workflow); } /* ***************************************************** */ -static int node_cmp(const void *a, const void *b) { +int ndpi_workflow_node_cmp(const void *a, const void *b) { struct ndpi_flow_info *fa = (struct ndpi_flow_info*)a; struct ndpi_flow_info *fb = (struct ndpi_flow_info*)b; @@ -318,21 +278,21 @@ static struct ndpi_flow_info *get_ndpi_flow_info(struct ndpi_workflow * workflow flow.lower_port = lower_port, flow.upper_port = upper_port; if(0) - printf("[NDPI] [%u][%u:%u <-> %u:%u]\n", + NDPI_LOG(0, workflow.ndpi_struct, NDPI_LOG_DEBUG, "[NDPI] [%u][%u:%u <-> %u:%u]\n", iph->protocol, lower_ip, ntohs(lower_port), upper_ip, ntohs(upper_port)); idx = (vlan_id + lower_ip + upper_ip + iph->protocol + lower_port + upper_port) % workflow->prefs.num_roots; - ret = ndpi_tfind(&flow, &workflow->ndpi_flows_root[idx], node_cmp); + ret = ndpi_tfind(&flow, &workflow->ndpi_flows_root[idx], ndpi_workflow_node_cmp); if(ret == NULL) { if(workflow->stats.ndpi_flow_count == workflow->prefs.max_ndpi_flows) { - printf("ERROR: maximum flow count (%u) has been exceeded\n", workflow->prefs.max_ndpi_flows); + NDPI_LOG(0, workflow.ndpi_struct, NDPI_LOG_ERROR, "maximum flow count (%u) has been exceeded\n", workflow->prefs.max_ndpi_flows); exit(-1); } else { struct ndpi_flow_info *newflow = (struct ndpi_flow_info*)malloc(sizeof(struct ndpi_flow_info)); if(newflow == NULL) { - printf("[NDPI] %s(1): not enough memory\n", __FUNCTION__); + NDPI_LOG(0, workflow.ndpi_struct, NDPI_LOG_ERROR, "[NDPI] %s(1): not enough memory\n", __FUNCTION__); return(NULL); } @@ -351,27 +311,27 @@ static struct ndpi_flow_info *get_ndpi_flow_info(struct ndpi_workflow * workflow } if((newflow->ndpi_flow = ndpi_malloc(SIZEOF_FLOW_STRUCT)) == NULL) { - printf("[NDPI] %s(2): not enough memory\n", __FUNCTION__); + NDPI_LOG(0, workflow.ndpi_struct, NDPI_LOG_ERROR, "[NDPI] %s(2): not enough memory\n", __FUNCTION__); free(newflow); return(NULL); } else memset(newflow->ndpi_flow, 0, SIZEOF_FLOW_STRUCT); if((newflow->src_id = ndpi_malloc(SIZEOF_ID_STRUCT)) == NULL) { - printf("[NDPI] %s(3): not enough memory\n", __FUNCTION__); + NDPI_LOG(0, workflow.ndpi_struct, NDPI_LOG_ERROR, "[NDPI] %s(3): not enough memory\n", __FUNCTION__); free(newflow); return(NULL); } else memset(newflow->src_id, 0, SIZEOF_ID_STRUCT); if((newflow->dst_id = ndpi_malloc(SIZEOF_ID_STRUCT)) == NULL) { - printf("[NDPI] %s(4): not enough memory\n", __FUNCTION__); + NDPI_LOG(0, workflow.ndpi_struct, NDPI_LOG_ERROR, "[NDPI] %s(4): not enough memory\n", __FUNCTION__); free(newflow); return(NULL); } else memset(newflow->dst_id, 0, SIZEOF_ID_STRUCT); - ndpi_tsearch(newflow, &workflow->ndpi_flows_root[idx], node_cmp); /* Add */ + ndpi_tsearch(newflow, &workflow->ndpi_flows_root[idx], ndpi_workflow_node_cmp); /* Add */ workflow->stats.ndpi_flow_count++; *src = newflow->src_id, *dst = newflow->dst_id; @@ -470,6 +430,7 @@ static unsigned int packet_processing(struct ndpi_workflow * workflow, return(0); } + /* Protocol already detected */ if(flow->detection_completed) return(0); flow->detected_protocol = ndpi_detection_process_packet(workflow->ndpi_struct, ndpi_flow, @@ -479,6 +440,7 @@ static unsigned int packet_processing(struct ndpi_workflow * workflow, if((flow->detected_protocol.protocol != NDPI_PROTOCOL_UNKNOWN) || ((proto == IPPROTO_UDP) && (flow->packets > 8)) || ((proto == IPPROTO_TCP) && (flow->packets > 10))) { + /* New protocol detected or give up */ flow->detection_completed = 1; if((flow->detected_protocol.protocol == NDPI_PROTOCOL_UNKNOWN) && (ndpi_flow->num_stun_udp_pkts > 0)) @@ -502,46 +464,19 @@ static unsigned int packet_processing(struct ndpi_workflow * workflow, snprintf(flow->ssl.server_certificate, sizeof(flow->ssl.server_certificate), "%s", flow->ndpi_flow->protos.ssl.server_certificate); } - if(flow->detected_protocol.protocol == NDPI_PROTOCOL_UNKNOWN) + if(flow->detected_protocol.protocol == NDPI_PROTOCOL_UNKNOWN) { flow->detected_protocol = ndpi_detection_giveup(workflow->ndpi_struct, flow->ndpi_flow); - + + if (workflow->__flow_giveup_callback != NULL) + workflow->__flow_giveup_callback(workflow, flow, workflow->__flow_giveup_udata); + } else { + if (workflow->__flow_detected_callback != NULL) + workflow->__flow_detected_callback(workflow, flow, workflow->__flow_detected_udata); + } + free_ndpi_flow_info(flow); - - /* TODO_EMA */ - /*if(verbose > 1) { - if(enable_protocol_guess) { - if(flow->detected_protocol.protocol == NDPI_PROTOCOL_UNKNOWN) { - flow->detected_protocol.protocol = node_guess_undetected_protocol(thread_id, flow), - flow->detected_protocol.master_protocol = NDPI_PROTOCOL_UNKNOWN; - } - } - - printFlow(thread_id, flow); - }*/ } - /* TODO_EMA use a callback */ - //~ if(live_capture) { - //~ if(ndpi_thread_info[thread_id].last_idle_scan_time + IDLE_SCAN_PERIOD < ndpi_thread_info[thread_id].last_time) { - //~ /* scan for idle flows */ - //~ ndpi_twalk(workflow->ndpi_flows_root[ndpi_thread_info[thread_id].idle_scan_idx], node_idle_scan_walker, &thread_id); - - //~ /* remove idle flows (unfortunately we cannot do this inline) */ - //~ while (ndpi_thread_info[thread_id].num_idle_flows > 0) { - - //~ /* search and delete the idle flow from the "ndpi_flow_root" (see struct reader thread) - here flows are the node of a b-tree */ - //~ ndpi_tdelete(ndpi_thread_info[thread_id].idle_flows[--ndpi_thread_info[thread_id].num_idle_flows], &workflow->ndpi_flows_root[ndpi_thread_info[thread_id].idle_scan_idx], node_cmp); - - //~ /* free the memory associated to idle flow in "idle_flows" - (see struct reader thread)*/ - //~ free_ndpi_flow_info(ndpi_thread_info[thread_id].idle_flows[ndpi_thread_info[thread_id].num_idle_flows]); - //~ ndpi_free(ndpi_thread_info[thread_id].idle_flows[ndpi_thread_info[thread_id].num_idle_flows]); - //~ } - - //~ if(++ndpi_thread_info[thread_id].idle_scan_idx == workflow->prefs.num_roots) ndpi_thread_info[thread_id].idle_scan_idx = 0; - //~ ndpi_thread_info[thread_id].last_idle_scan_time = ndpi_thread_info[thread_id].last_time; - //~ } - //~ } - return 0; } @@ -763,7 +698,8 @@ void ndpi_workflow_process_packet (struct ndpi_workflow * workflow, static u_int8_t cap_warning_used = 0; if(cap_warning_used == 0) { - if((!json_flag) && (!workflow->prefs.quiet_mode)) printf("\n\nWARNING: packet capture size is smaller than packet size, DETECTION MIGHT NOT WORK CORRECTLY\n\n"); + if(!workflow->prefs.quiet_mode) + NDPI_LOG(0, workflow.ndpi_struct, NDPI_LOG_DEBUG, "\n\nWARNING: packet capture size is smaller than packet size, DETECTION MIGHT NOT WORK CORRECTLY\n\n"); cap_warning_used = 1; } } @@ -783,7 +719,8 @@ void ndpi_workflow_process_packet (struct ndpi_workflow * workflow, workflow->stats.fragmented_count++; if(ipv4_frags_warning_used == 0) { - if((!json_flag) && (!workflow->prefs.quiet_mode)) printf("\n\nWARNING: IPv4 fragments are not handled by this demo (nDPI supports them)\n"); + if(!workflow->prefs.quiet_mode) + NDPI_LOG(0, workflow.ndpi_struct, NDPI_LOG_DEBUG, "\n\nWARNING: IPv4 fragments are not handled by this demo (nDPI supports them)\n"); ipv4_frags_warning_used = 1; } @@ -808,8 +745,8 @@ void ndpi_workflow_process_packet (struct ndpi_workflow * workflow, v4_warning: if(ipv4_warning_used == 0) { - if((!json_flag) && (!workflow->prefs.quiet_mode)) - printf("\n\nWARNING: only IPv4/IPv6 packets are supported in this demo (nDPI supports both IPv4 and IPv6), all other packets will be discarded\n\n"); + if(!workflow->prefs.quiet_mode) + NDPI_LOG(0, workflow.ndpi_struct, NDPI_LOG_DEBUG, "\n\nWARNING: only IPv4/IPv6 packets are supported in this demo (nDPI supports both IPv4 and IPv6), all other packets will be discarded\n\n"); ipv4_warning_used = 1; } workflow->stats.total_discarded_bytes += header->len; |