diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2021-06-16 19:25:27 +0200 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2021-06-16 19:28:02 +0200 |
commit | db87d45edb175572112aa08840f08bc7c61cadcb (patch) | |
tree | e5c9ae7ff92404be13ba5b01a791be3f8a22f1f8 /nDPId.c | |
parent | fac7648326c6cea478b92872f7abb3f799961cfc (diff) |
Added zLib compression parameters to control compression conditions.
* more structs are now "compressable"
* fixed missing DAEMON_RECONNECT event
* improved memory profiler
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r-- | nDPId.c | 361 |
1 files changed, 236 insertions, 125 deletions
@@ -104,15 +104,22 @@ struct nDPId_flow_extended }; /* - * Structures related to certain flow states. + * Skipped flows need at least some information. */ struct nDPId_flow_skipped { struct nDPId_flow_basic flow_basic; }; -struct nDPI_data +/* + * Structure which is important for the detection process. + * The structure is also a compression target, if activated. + */ +struct nDPId_detection_data { + uint32_t last_ndpi_flow_struct_hash; + struct ndpi_proto detected_l7_protocol; + struct ndpi_proto guessed_l7_protocol; struct ndpi_flow_struct flow; struct ndpi_id_struct src; struct ndpi_id_struct dst; @@ -126,16 +133,11 @@ struct nDPId_flow_info uint8_t reserved_00 : 7; uint8_t reserved_01[1]; #ifdef ENABLE_ZLIB - uint16_t ndpi_compressed_size; + uint16_t detection_data_compressed_size; #else uint16_t reserved_02; #endif - uint32_t last_ndpi_flow_struct_hash; - - struct ndpi_proto detected_l7_protocol; - struct ndpi_proto guessed_l7_protocol; - - struct nDPI_data * ndpi; + struct nDPId_detection_data * detection_data; }; struct nDPId_flow_finished @@ -156,6 +158,12 @@ struct nDPId_workflow unsigned long long int total_l4_data_len; unsigned long long int detected_flow_protocols; +#ifdef ENABLE_MEMORY_PROFILING + uint64_t last_memory_usage_log_time; +#endif +#ifdef ENABLE_ZLIB + uint64_t last_compression_scan_time; +#endif uint64_t last_idle_scan_time; uint64_t last_time; @@ -329,6 +337,10 @@ static struct unsigned long long int max_idle_flows_per_thread; unsigned long long int tick_resolution; unsigned long long int reader_thread_count; +#ifdef ENABLE_ZLIB + unsigned long long int compression_scan_period; + unsigned long long int compression_flow_inactivity; +#endif unsigned long long int idle_scan_period; unsigned long long int generic_max_idle_time; unsigned long long int icmp_max_idle_time; @@ -344,6 +356,10 @@ static struct .max_idle_flows_per_thread = nDPId_MAX_IDLE_FLOWS_PER_THREAD / 2, .tick_resolution = nDPId_TICK_RESOLUTION, .reader_thread_count = nDPId_MAX_READER_THREADS / 2, +#ifdef ENABLE_ZLIB + .compression_scan_period = nDPId_COMPRESSION_SCAN_PERIOD, + .compression_flow_inactivity = nDPId_COMPRESSION_FLOW_INACTIVITY, +#endif .idle_scan_period = nDPId_IDLE_SCAN_PERIOD, .generic_max_idle_time = nDPId_GENERIC_IDLE_TIME, .icmp_max_idle_time = nDPId_ICMP_IDLE_TIME, @@ -360,6 +376,10 @@ enum nDPId_subopts TICK_RESOLUTION, MAX_READER_THREADS, IDLE_SCAN_PERIOD, +#ifdef ENABLE_ZLIB + COMPRESSION_SCAN_PERIOD, + COMPRESSION_FLOW_INACTIVITY, +#endif GENERIC_MAX_IDLE_TIME, ICMP_MAX_IDLE_TIME, UDP_MAX_IDLE_TIME, @@ -372,6 +392,10 @@ static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-th [MAX_IDLE_FLOWS_PER_THREAD] = "max-idle-flows-per-thread", [TICK_RESOLUTION] = "tick-resolution", [MAX_READER_THREADS] = "max-reader-threads", +#ifdef ENABLE_ZLIB + [COMPRESSION_SCAN_PERIOD] = "compression-scan-period", + [COMPRESSION_FLOW_INACTIVITY] = "compression-flow-activity", +#endif [IDLE_SCAN_PERIOD] = "idle-scan-period", [GENERIC_MAX_IDLE_TIME] = "generic-max-idle-time", [ICMP_MAX_IDLE_TIME] = "icmp-max-idle-time", @@ -456,6 +480,7 @@ static int zlib_inflate(const void * src, int srcLen, void * dst, int dstLen) ret = strm.total_out; #ifdef ENABLE_MEMORY_PROFILING __sync_fetch_and_add(&zlib_decompressions, 1); + __sync_fetch_and_sub(&zlib_compression_diff, ret - srcLen); #endif } else @@ -474,75 +499,137 @@ static int zlib_inflate(const void * src, int srcLen, void * dst, int dstLen) return ret; } -static int ndpi_data_deflate(struct nDPId_flow_info * const flow_info) +static int detection_data_deflate(struct nDPId_flow_info * const flow_info) { - uint8_t tmpOut[sizeof(*flow_info->ndpi)]; + uint8_t tmpOut[sizeof(*flow_info->detection_data)]; int ret; - if (flow_info->ndpi_compressed_size != 0) + if (flow_info->detection_data_compressed_size > 0) { return -7; } - ret = zlib_deflate(flow_info->ndpi, sizeof(*flow_info->ndpi), tmpOut, sizeof(tmpOut)); + ret = zlib_deflate(flow_info->detection_data, sizeof(*flow_info->detection_data), tmpOut, sizeof(tmpOut)); if (ret <= 0) { return ret; } - struct nDPI_data * const new_ndpi_data = ndpi_malloc(ret); - if (new_ndpi_data == NULL) + struct nDPId_detection_data * const new_det_data = ndpi_malloc(ret); + if (new_det_data == NULL) { return -8; } - ndpi_free(flow_info->ndpi); - flow_info->ndpi = new_ndpi_data; + ndpi_free(flow_info->detection_data); + flow_info->detection_data = new_det_data; - memcpy(flow_info->ndpi, tmpOut, ret); - flow_info->ndpi_compressed_size = ret; + memcpy(flow_info->detection_data, tmpOut, ret); + flow_info->detection_data_compressed_size = ret; return ret; } -static int ndpi_data_inflate(struct nDPId_flow_info * const flow_info) +static int detection_data_inflate(struct nDPId_flow_info * const flow_info) { - uint8_t tmpOut[sizeof(*flow_info->ndpi)]; + uint8_t tmpOut[sizeof(*flow_info->detection_data)]; int ret; - if (flow_info->ndpi_compressed_size == 0) + if (flow_info->detection_data_compressed_size == 0) { return -7; } - ret = zlib_inflate(flow_info->ndpi, flow_info->ndpi_compressed_size, tmpOut, sizeof(tmpOut)); + ret = zlib_inflate(flow_info->detection_data, flow_info->detection_data_compressed_size, tmpOut, sizeof(tmpOut)); if (ret <= 0) { return ret; } - struct nDPI_data * const new_ndpi_data = ndpi_malloc(ret); - if (new_ndpi_data == NULL) + struct nDPId_detection_data * const new_det_data = ndpi_malloc(ret); + if (new_det_data == NULL) { return -8; } - ndpi_free(flow_info->ndpi); - flow_info->ndpi = new_ndpi_data; + ndpi_free(flow_info->detection_data); + flow_info->detection_data = new_det_data; - memcpy(flow_info->ndpi, tmpOut, ret); - flow_info->ndpi_compressed_size = 0; + memcpy(flow_info->detection_data, tmpOut, ret); + flow_info->detection_data_compressed_size = 0; /* - * Do not use ndpi_id_struct's from ndpi_flow - * as they may not be valid anymore. + * The ndpi_id_struct's from ndpi_flow may not be valid anymore. * nDPI only updates those pointers while processing packets! * This is especially important when using compression * to prevent use of dangling pointers. */ - flow_info->ndpi->flow.src = &flow_info->ndpi->src; - flow_info->ndpi->flow.dst = &flow_info->ndpi->dst; + flow_info->detection_data->flow.src = &flow_info->detection_data->src; + flow_info->detection_data->flow.dst = &flow_info->detection_data->dst; return ret; } + +static void ndpi_comp_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data) +{ + struct nDPId_workflow * const workflow = (struct nDPId_workflow *)user_data; + struct nDPId_flow_basic * const flow_basic = *(struct nDPId_flow_basic **)A; + + (void)depth; + + if (workflow == NULL || flow_basic == NULL) + { + return; + } + + if (which == ndpi_preorder || which == ndpi_leaf) + { + switch (flow_basic->type) + { + case FT_UNKNOWN: + case FT_SKIPPED: + case FT_FINISHED: + break; + + case FT_INFO: + { + if (flow_basic->last_seen + nDPId_options.compression_flow_inactivity < workflow->last_time) + { + struct nDPId_flow_info * const flow_info = (struct nDPId_flow_info *)flow_basic; + + if (flow_info->detection_data_compressed_size > 0) + { + break; + } + + int ret = detection_data_deflate(flow_info); + + if (ret <= 0) + { + syslog(LOG_DAEMON | LOG_ERR, + "zLib compression failed for flow %u with error code: %d", + flow_info->flow_extended.flow_id, + ret); + } + } + break; + } + } + } +} + +static void check_for_compressable_flows(struct nDPId_reader_thread * const reader_thread) +{ + struct nDPId_workflow * const workflow = reader_thread->workflow; + + if (workflow->last_compression_scan_time + nDPId_options.compression_scan_period < workflow->last_time) + { + for (size_t comp_scan_index = 0; comp_scan_index < workflow->max_active_flows; ++comp_scan_index) + { + ndpi_twalk(workflow->ndpi_flows_active[comp_scan_index], ndpi_comp_scan_walker, workflow); + } + + workflow->last_compression_scan_time = workflow->last_time; + } +} #endif static void ip_netmask_to_subnet(union nDPId_ip const * const ip, @@ -829,6 +916,49 @@ static void ndpi_free_wrapper(void * const freeable) free(p); } + +static void log_memory_usage(struct nDPId_reader_thread * const reader_thread) +{ + struct nDPId_workflow * const workflow = reader_thread->workflow; + + if (workflow->last_memory_usage_log_time + nDPId_LOG_MEMORY_USAGE_EVERY < workflow->last_time) + { + if (reader_thread->array_index == 0) + { + uint64_t alloc_count = __sync_fetch_and_add(&ndpi_memory_alloc_count, 0); + uint64_t free_count = __sync_fetch_and_add(&ndpi_memory_free_count, 0); + uint64_t alloc_bytes = __sync_fetch_and_add(&ndpi_memory_alloc_bytes, 0); + uint64_t free_bytes = __sync_fetch_and_add(&ndpi_memory_free_bytes, 0); + + syslog(LOG_DAEMON, + "MemoryProfiler: %llu allocs, %llu frees, %llu bytes allocated, %llu bytes freed, %llu blocks in " + "use, " + "%llu bytes in use", + (long long unsigned int)alloc_count, + (long long unsigned int)free_count, + (long long unsigned int)alloc_bytes, + (long long unsigned int)free_bytes, + (long long unsigned int)(alloc_count - free_count), + (long long unsigned int)(alloc_bytes - free_bytes)); +#ifdef ENABLE_ZLIB + uint64_t zlib_compression_count = __sync_fetch_and_add(&zlib_compressions, 0); + uint64_t zlib_decompression_count = __sync_fetch_and_add(&zlib_decompressions, 0); + uint64_t zlib_bytes_diff = __sync_fetch_and_add(&zlib_compression_diff, 0); + + syslog(LOG_DAEMON, + "MemoryProfiler (zLib): %llu compressions, %llu decompressions, %llu compressed blocks in use, %llu " + "bytes " + "diff", + (long long unsigned int)zlib_compression_count, + (long long unsigned int)zlib_decompression_count, + (long long unsigned int)zlib_compression_count - (long long unsigned int)zlib_decompression_count, + (long long unsigned int)zlib_bytes_diff); +#endif + } + + workflow->last_memory_usage_log_time = workflow->last_time; + } +} #endif static struct nDPId_workflow * init_workflow(char const * const file_or_device) @@ -950,25 +1080,27 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device) return workflow; } -static void free_ndpi_structs(struct nDPId_flow_info * const flow_info) +static void free_detection_data(struct nDPId_flow_info * const flow_info) { - ndpi_free_flow_data(&flow_info->ndpi->flow); - ndpi_free(flow_info->ndpi); - flow_info->ndpi = NULL; + ndpi_free_flow_data(&flow_info->detection_data->flow); + ndpi_free(flow_info->detection_data); + flow_info->detection_data = NULL; } -static int alloc_ndpi_structs(struct nDPId_flow_info * const flow_info) +static int alloc_detection_data(struct nDPId_flow_info * const flow_info) { - flow_info->ndpi = (struct nDPI_data *)ndpi_flow_malloc(sizeof(*flow_info->ndpi)); + flow_info->detection_data = (struct nDPId_detection_data *)ndpi_flow_malloc(sizeof(*flow_info->detection_data)); - if (flow_info->ndpi == NULL) + if (flow_info->detection_data == NULL) { goto error; } + memset(flow_info->detection_data, 0, sizeof(*flow_info->detection_data)); + return 0; error: - free_ndpi_structs(flow_info); + free_detection_data(flow_info); return 1; } @@ -986,7 +1118,7 @@ static void ndpi_flow_info_freer(void * const node) case FT_INFO: { struct nDPId_flow_info * const flow_info = (struct nDPId_flow_info *)flow_basic; - free_ndpi_structs(flow_info); + free_detection_data(flow_info); break; } } @@ -1295,9 +1427,9 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread, struct nDPId_flow_info * const flow_info = (struct nDPId_flow_info *)flow_basic; #ifdef ENABLE_ZLIB - if (nDPId_options.enable_zlib_compression != 0 && flow_info->ndpi_compressed_size > 0) + if (nDPId_options.enable_zlib_compression != 0 && flow_info->detection_data_compressed_size > 0) { - int ret = ndpi_data_inflate(flow_info); + int ret = detection_data_inflate(flow_info); if (ret <= 0) { syslog(LOG_DAEMON | LOG_ERR, "zLib decompression failed with error code: %d", ret); @@ -1310,12 +1442,11 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread, { uint8_t protocol_was_guessed = 0; - if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_info->guessed_l7_protocol) == 0) + if (ndpi_is_protocol_detected(workflow->ndpi_struct, + flow_info->detection_data->guessed_l7_protocol) == 0) { - flow_info->guessed_l7_protocol = ndpi_detection_giveup(workflow->ndpi_struct, - &flow_info->ndpi->flow, - 1, - &protocol_was_guessed); + flow_info->detection_data->guessed_l7_protocol = ndpi_detection_giveup( + workflow->ndpi_struct, &flow_info->detection_data->flow, 1, &protocol_was_guessed); } else { @@ -1355,38 +1486,6 @@ static void check_for_idle_flows(struct nDPId_reader_thread * const reader_threa if (workflow->last_idle_scan_time + nDPId_options.idle_scan_period < workflow->last_time) { -#ifdef ENABLE_MEMORY_PROFILING - if (reader_thread->array_index == 0) - { - uint64_t alloc_count = __sync_fetch_and_add(&ndpi_memory_alloc_count, 0); - uint64_t free_count = __sync_fetch_and_add(&ndpi_memory_free_count, 0); - uint64_t alloc_bytes = __sync_fetch_and_add(&ndpi_memory_alloc_bytes, 0); - uint64_t free_bytes = __sync_fetch_and_add(&ndpi_memory_free_bytes, 0); - - syslog(LOG_DAEMON, - "MemoryProfiler: %llu allocs, %llu frees, %llu bytes allocated, %llu bytes freed, %llu blocks in " - "use, " - "%llu bytes in use", - (long long unsigned int)alloc_count, - (long long unsigned int)free_count, - (long long unsigned int)alloc_bytes, - (long long unsigned int)free_bytes, - (long long unsigned int)(alloc_count - free_count), - (long long unsigned int)(alloc_bytes - free_bytes)); -#ifdef ENABLE_ZLIB - uint64_t zlib_compression_count = __sync_fetch_and_add(&zlib_compressions, 0); - uint64_t zlib_decompression_count = __sync_fetch_and_add(&zlib_decompressions, 0); - uint64_t zlib_bytes_diff = __sync_fetch_and_add(&zlib_compression_diff, 0); - - syslog(LOG_DAEMON, - "MemoryProfiler (zLib): %llu compressions, %llu decompressions, %llu bytes difference", - (long long unsigned int)zlib_compression_count, - (long long unsigned int)zlib_decompression_count, - (long long unsigned int)zlib_bytes_diff); -#endif - } -#endif - for (size_t idle_scan_index = 0; idle_scan_index < workflow->max_active_flows; ++idle_scan_index) { ndpi_twalk(workflow->ndpi_flows_active[idle_scan_index], ndpi_idle_scan_walker, workflow); @@ -1615,6 +1714,7 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread, "[%8llu, %d] Reconnected to JSON sink", workflow->packets_captured, reader_thread->array_index); + jsonize_daemon(reader_thread, DAEMON_EVENT_RECONNECT); } } @@ -1883,8 +1983,8 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, case FLOW_EVENT_NOT_DETECTED: case FLOW_EVENT_GUESSED: if (ndpi_dpi2json(workflow->ndpi_struct, - &flow->ndpi->flow, - flow->guessed_l7_protocol, + &flow->detection_data->flow, + flow->detection_data->guessed_l7_protocol, &workflow->ndpi_serializer) != 0) { syslog(LOG_DAEMON | LOG_ERR, @@ -1897,8 +1997,8 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, case FLOW_EVENT_DETECTED: case FLOW_EVENT_DETECTION_UPDATE: if (ndpi_dpi2json(workflow->ndpi_struct, - &flow->ndpi->flow, - flow->detected_l7_protocol, + &flow->detection_data->flow, + flow->detection_data->detected_l7_protocol, &workflow->ndpi_serializer) != 0) { syslog(LOG_DAEMON | LOG_ERR, @@ -2471,6 +2571,9 @@ static void ndpi_process_packet(uint8_t * const args, workflow->last_time = time_ms; check_for_idle_flows(reader_thread); +#ifdef ENABLE_MEMORY_PROFILING + log_memory_usage(reader_thread); +#endif if (process_datalink_layer(reader_thread, header, packet, &ip_offset, &type) != 0) { @@ -2806,7 +2909,7 @@ static void ndpi_process_packet(uint8_t * const args, workflow->total_active_flows++; flow_to_process->flow_extended.flow_id = __sync_fetch_and_add(&global_flow_id, 1); - if (alloc_ndpi_structs(flow_to_process) != 0) + if (alloc_detection_data(flow_to_process) != 0) { jsonize_packet_event( reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD); @@ -2815,10 +2918,8 @@ static void ndpi_process_packet(uint8_t * const args, return; } - memset(flow_to_process->ndpi, 0, sizeof(*flow_to_process->ndpi)); - - ndpi_src = &flow_to_process->ndpi->src; - ndpi_dst = &flow_to_process->ndpi->dst; + ndpi_src = &flow_to_process->detection_data->src; + ndpi_dst = &flow_to_process->detection_data->dst; is_new_flow = 1; } @@ -2849,9 +2950,9 @@ static void ndpi_process_packet(uint8_t * const args, flow_to_process = (struct nDPId_flow_info *)flow_basic_to_process; #ifdef ENABLE_ZLIB - if (nDPId_options.enable_zlib_compression != 0 && flow_to_process->ndpi_compressed_size > 0) + if (nDPId_options.enable_zlib_compression != 0 && flow_to_process->detection_data_compressed_size > 0) { - int ret = ndpi_data_inflate(flow_to_process); + int ret = detection_data_inflate(flow_to_process); if (ret <= 0) { syslog(LOG_DAEMON | LOG_ERR, @@ -2865,13 +2966,13 @@ static void ndpi_process_packet(uint8_t * const args, if (direction_changed != 0) { - ndpi_src = &flow_to_process->ndpi->dst; - ndpi_dst = &flow_to_process->ndpi->src; + ndpi_src = &flow_to_process->detection_data->dst; + ndpi_dst = &flow_to_process->detection_data->src; } else { - ndpi_src = &flow_to_process->ndpi->src; - ndpi_dst = &flow_to_process->ndpi->dst; + ndpi_src = &flow_to_process->detection_data->src; + ndpi_dst = &flow_to_process->detection_data->dst; } } @@ -2907,7 +3008,7 @@ static void ndpi_process_packet(uint8_t * const args, &flow_to_process->flow_extended, PACKET_EVENT_PAYLOAD_FLOW); - if (flow_to_process->ndpi->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process - 1) + if (flow_to_process->detection_data->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process - 1) { if (flow_to_process->detection_completed != 0) { @@ -2917,8 +3018,8 @@ static void ndpi_process_packet(uint8_t * const args, { /* last chance to guess something, better then nothing */ uint8_t protocol_was_guessed = 0; - flow_to_process->guessed_l7_protocol = - ndpi_detection_giveup(workflow->ndpi_struct, &flow_to_process->ndpi->flow, 1, &protocol_was_guessed); + flow_to_process->detection_data->guessed_l7_protocol = ndpi_detection_giveup( + workflow->ndpi_struct, &flow_to_process->detection_data->flow, 1, &protocol_was_guessed); if (protocol_was_guessed != 0) { jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_GUESSED); @@ -2930,48 +3031,43 @@ static void ndpi_process_packet(uint8_t * const args, } } - flow_to_process->detected_l7_protocol = ndpi_detection_process_packet(workflow->ndpi_struct, - &flow_to_process->ndpi->flow, - ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6, - ip_size, - time_ms, - ndpi_src, - ndpi_dst); + flow_to_process->detection_data->detected_l7_protocol = + ndpi_detection_process_packet(workflow->ndpi_struct, + &flow_to_process->detection_data->flow, + ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6, + ip_size, + time_ms, + ndpi_src, + ndpi_dst); - if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_to_process->detected_l7_protocol) != 0 && + if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_to_process->detection_data->detected_l7_protocol) != 0 && flow_to_process->detection_completed == 0) { flow_to_process->detection_completed = 1; workflow->detected_flow_protocols++; jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTED); - flow_to_process->last_ndpi_flow_struct_hash = calculate_ndpi_flow_struct_hash(&flow_to_process->ndpi->flow); + flow_to_process->detection_data->last_ndpi_flow_struct_hash = + calculate_ndpi_flow_struct_hash(&flow_to_process->detection_data->flow); } else if (flow_to_process->detection_completed == 1) { - uint32_t hash = calculate_ndpi_flow_struct_hash(&flow_to_process->ndpi->flow); - if (hash != flow_to_process->last_ndpi_flow_struct_hash) + uint32_t hash = calculate_ndpi_flow_struct_hash(&flow_to_process->detection_data->flow); + if (hash != flow_to_process->detection_data->last_ndpi_flow_struct_hash) { jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTION_UPDATE); - flow_to_process->last_ndpi_flow_struct_hash = hash; + flow_to_process->detection_data->last_ndpi_flow_struct_hash = hash; } } - if (flow_to_process->ndpi->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process) + if (flow_to_process->detection_data->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process) { - free_ndpi_structs(flow_to_process); + free_detection_data(flow_to_process); flow_to_process->flow_extended.flow_basic.type = FT_FINISHED; } #ifdef ENABLE_ZLIB else if (nDPId_options.enable_zlib_compression != 0) { - int ret = ndpi_data_deflate(flow_to_process); - if (ret <= 0) - { - syslog(LOG_DAEMON | LOG_ERR, - "zLib compression failed for flow %u with error code: %d", - flow_to_process->flow_extended.flow_id, - ret); - } + check_for_compressable_flows(reader_thread); } #endif } @@ -3255,7 +3351,8 @@ static void print_subopt_usage(void) if (*token != NULL) { fprintf(stderr, "\t\t%s = ", *token); - switch (index++) + enum nDPId_subopts subopts = index++; + switch (subopts) { case MAX_FLOWS_PER_THREAD: fprintf(stderr, "%llu\n", nDPId_options.max_flows_per_thread); @@ -3272,6 +3369,14 @@ static void print_subopt_usage(void) case IDLE_SCAN_PERIOD: fprintf(stderr, "%llu\n", nDPId_options.idle_scan_period); break; +#ifdef ENABLE_ZLIB + case COMPRESSION_SCAN_PERIOD: + fprintf(stderr, "%llu\n", nDPId_options.compression_scan_period); + break; + case COMPRESSION_FLOW_INACTIVITY: + fprintf(stderr, "%llu\n", nDPId_options.compression_flow_inactivity); + break; +#endif case GENERIC_MAX_IDLE_TIME: fprintf(stderr, "%llu\n", nDPId_options.generic_max_idle_time); break; @@ -3293,8 +3398,6 @@ static void print_subopt_usage(void) case MAX_PACKETS_PER_FLOW_TO_PROCESS: fprintf(stderr, "%llu\n", nDPId_options.max_packets_per_flow_to_process); break; - default: - break; } } else @@ -3468,6 +3571,14 @@ static int nDPId_parse_options(int argc, char ** argv) case IDLE_SCAN_PERIOD: nDPId_options.idle_scan_period = value_llu; break; +#ifdef ENABLE_ZLIB + case COMPRESSION_SCAN_PERIOD: + nDPId_options.compression_scan_period = value_llu; + break; + case COMPRESSION_FLOW_INACTIVITY: + nDPId_options.compression_flow_inactivity = value_llu; + break; +#endif case GENERIC_MAX_IDLE_TIME: nDPId_options.generic_max_idle_time = value_llu; break; @@ -3665,7 +3776,7 @@ int main(int argc, char ** argv) openlog("nDPId", LOG_CONS | LOG_PERROR, LOG_DAEMON); #ifdef ENABLE_MEMORY_PROFILING - syslog(LOG_DAEMON, "size/flow: %zu bytes\n", sizeof(struct nDPId_flow_info) + sizeof(struct nDPI_data)); + syslog(LOG_DAEMON, "size/flow: %zu bytes\n", sizeof(struct nDPId_flow_info) + sizeof(struct nDPId_detection_data)); #endif if (setup_reader_threads() != 0) |