diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2021-06-14 14:43:29 +0200 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2021-06-14 15:33:29 +0200 |
commit | fac7648326c6cea478b92872f7abb3f799961cfc (patch) | |
tree | d2ad3e1c3952f787b1b25259971b04f313d8e388 /nDPId.c | |
parent | 98b11f814f54bb23cdd58299e63dc49264e3b5bc (diff) |
Support for zLib flow memory compression. Experimental.
Please use this feature only for testing purposes.
It will change or be removed in the future.
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r-- | nDPId.c | 320 |
1 files changed, 281 insertions, 39 deletions
@@ -18,6 +18,9 @@ #include <sys/un.h> #include <syslog.h> #include <unistd.h> +#ifdef ENABLE_ZLIB +#include <zlib.h> +#endif #include "config.h" #include "utils.h" @@ -108,21 +111,31 @@ struct nDPId_flow_skipped struct nDPId_flow_basic flow_basic; }; +struct nDPI_data +{ + struct ndpi_flow_struct flow; + struct ndpi_id_struct src; + struct ndpi_id_struct dst; +}; + struct nDPId_flow_info { struct nDPId_flow_extended flow_extended; uint8_t detection_completed : 1; uint8_t reserved_00 : 7; - uint8_t reserved_01[3]; + uint8_t reserved_01[1]; +#ifdef ENABLE_ZLIB + uint16_t ndpi_compressed_size; +#else + uint16_t reserved_02; +#endif uint32_t last_ndpi_flow_struct_hash; struct ndpi_proto detected_l7_protocol; struct ndpi_proto guessed_l7_protocol; - struct ndpi_flow_struct * ndpi_flow; - struct ndpi_id_struct * ndpi_src; - struct ndpi_id_struct * ndpi_dst; + struct nDPI_data * ndpi; }; struct nDPId_flow_finished @@ -281,6 +294,11 @@ static uint64_t ndpi_memory_alloc_count = 0; static uint64_t ndpi_memory_alloc_bytes = 0; static uint64_t ndpi_memory_free_count = 0; static uint64_t ndpi_memory_free_bytes = 0; +#ifdef ENABLE_ZLIB +static uint64_t zlib_compressions = 0; +static uint64_t zlib_decompressions = 0; +static uint64_t zlib_compression_diff = 0; +#endif #endif static struct @@ -302,6 +320,9 @@ static struct char * custom_ja3_file; char * custom_sha1_file; char json_sockpath[UNIX_PATH_MAX]; +#ifdef ENABLE_ZLIB + uint8_t enable_zlib_compression; +#endif /* subopts */ char * instance_alias; unsigned long long int max_flows_per_thread; @@ -367,6 +388,163 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, struct nDPId_flow_info * const flow, enum flow_event event); +#ifdef ENABLE_ZLIB +static int zlib_deflate(const void * const src, int srcLen, void * dst, int dstLen) +{ + z_stream strm = {0}; + strm.total_in = strm.avail_in = srcLen; + strm.total_out = strm.avail_out = dstLen; + strm.next_in = (Bytef *)src; + strm.next_out = (Bytef *)dst; + + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + + int err = -1; + int ret = -1; + + err = deflateInit(&strm, Z_BEST_COMPRESSION); + if (err == Z_OK) + { + err = deflate(&strm, Z_FINISH); + if (err == Z_STREAM_END) + { + ret = strm.total_out; +#ifdef ENABLE_MEMORY_PROFILING + __sync_fetch_and_add(&zlib_compressions, 1); + __sync_fetch_and_add(&zlib_compression_diff, srcLen - ret); +#endif + } + else + { + deflateEnd(&strm); + return err; + } + } + else + { + deflateEnd(&strm); + return err; + } + + deflateEnd(&strm); + return ret; +} + +static int zlib_inflate(const void * src, int srcLen, void * dst, int dstLen) +{ + z_stream strm = {0}; + strm.total_in = strm.avail_in = srcLen; + strm.total_out = strm.avail_out = dstLen; + strm.next_in = (Bytef *)src; + strm.next_out = (Bytef *)dst; + + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + + int err = -1; + int ret = -1; + + err = inflateInit2(&strm, (15 + 32)); // 15 window bits, and the +32 tells zlib to to detect if using gzip or zlib + if (err == Z_OK) + { + err = inflate(&strm, Z_FINISH); + if (err == Z_STREAM_END) + { + ret = strm.total_out; +#ifdef ENABLE_MEMORY_PROFILING + __sync_fetch_and_add(&zlib_decompressions, 1); +#endif + } + else + { + inflateEnd(&strm); + return err; + } + } + else + { + inflateEnd(&strm); + return err; + } + + inflateEnd(&strm); + return ret; +} + +static int ndpi_data_deflate(struct nDPId_flow_info * const flow_info) +{ + uint8_t tmpOut[sizeof(*flow_info->ndpi)]; + int ret; + + if (flow_info->ndpi_compressed_size != 0) + { + return -7; + } + + ret = zlib_deflate(flow_info->ndpi, sizeof(*flow_info->ndpi), tmpOut, sizeof(tmpOut)); + if (ret <= 0) + { + return ret; + } + + struct nDPI_data * const new_ndpi_data = ndpi_malloc(ret); + if (new_ndpi_data == NULL) + { + return -8; + } + ndpi_free(flow_info->ndpi); + flow_info->ndpi = new_ndpi_data; + + memcpy(flow_info->ndpi, tmpOut, ret); + flow_info->ndpi_compressed_size = ret; + + return ret; +} + +static int ndpi_data_inflate(struct nDPId_flow_info * const flow_info) +{ + uint8_t tmpOut[sizeof(*flow_info->ndpi)]; + int ret; + + if (flow_info->ndpi_compressed_size == 0) + { + return -7; + } + + ret = zlib_inflate(flow_info->ndpi, flow_info->ndpi_compressed_size, tmpOut, sizeof(tmpOut)); + if (ret <= 0) + { + return ret; + } + + struct nDPI_data * const new_ndpi_data = ndpi_malloc(ret); + if (new_ndpi_data == NULL) + { + return -8; + } + ndpi_free(flow_info->ndpi); + flow_info->ndpi = new_ndpi_data; + + memcpy(flow_info->ndpi, tmpOut, ret); + flow_info->ndpi_compressed_size = 0; + + /* + * Do not use ndpi_id_struct's from ndpi_flow + * as they may not be valid anymore. + * nDPI only updates those pointers while processing packets! + * This is especially important when using compression + * to prevent use of dangling pointers. + */ + flow_info->ndpi->flow.src = &flow_info->ndpi->src; + flow_info->ndpi->flow.dst = &flow_info->ndpi->dst; + + return ret; +} +#endif + static void ip_netmask_to_subnet(union nDPId_ip const * const ip, union nDPId_ip const * const netmask, union nDPId_ip * const subnet, @@ -774,21 +952,16 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device) static void free_ndpi_structs(struct nDPId_flow_info * const flow_info) { - ndpi_free(flow_info->ndpi_dst); - flow_info->ndpi_dst = NULL; - ndpi_free(flow_info->ndpi_src); - flow_info->ndpi_src = NULL; - ndpi_free_flow(flow_info->ndpi_flow); - flow_info->ndpi_flow = NULL; + ndpi_free_flow_data(&flow_info->ndpi->flow); + ndpi_free(flow_info->ndpi); + flow_info->ndpi = NULL; } static int alloc_ndpi_structs(struct nDPId_flow_info * const flow_info) { - flow_info->ndpi_dst = (struct ndpi_id_struct *)ndpi_calloc(1, SIZEOF_ID_STRUCT); - flow_info->ndpi_src = (struct ndpi_id_struct *)ndpi_calloc(1, SIZEOF_ID_STRUCT); - flow_info->ndpi_flow = (struct ndpi_flow_struct *)ndpi_flow_malloc(SIZEOF_FLOW_STRUCT); + flow_info->ndpi = (struct nDPI_data *)ndpi_flow_malloc(sizeof(*flow_info->ndpi)); - if (flow_info->ndpi_dst == NULL || flow_info->ndpi_src == NULL || flow_info->ndpi_flow == NULL) + if (flow_info->ndpi == NULL) { goto error; } @@ -1068,7 +1241,7 @@ static int ndpi_workflow_node_cmp(void const * const A, void const * const B) return 1; } - /* Flows have the same hash */ + /* flows have the same hash */ if (flow_basic_a->l4_protocol < flow_basic_b->l4_protocol) { return -1; @@ -1121,6 +1294,18 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread, { struct nDPId_flow_info * const flow_info = (struct nDPId_flow_info *)flow_basic; +#ifdef ENABLE_ZLIB + if (nDPId_options.enable_zlib_compression != 0 && flow_info->ndpi_compressed_size > 0) + { + int ret = ndpi_data_inflate(flow_info); + if (ret <= 0) + { + syslog(LOG_DAEMON | LOG_ERR, "zLib decompression failed with error code: %d", ret); + return; + } + } +#endif + if (flow_info->detection_completed == 0) { uint8_t protocol_was_guessed = 0; @@ -1128,7 +1313,7 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread, if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_info->guessed_l7_protocol) == 0) { flow_info->guessed_l7_protocol = ndpi_detection_giveup(workflow->ndpi_struct, - flow_info->ndpi_flow, + &flow_info->ndpi->flow, 1, &protocol_was_guessed); } @@ -1188,6 +1373,17 @@ static void check_for_idle_flows(struct nDPId_reader_thread * const reader_threa (long long unsigned int)free_bytes, (long long unsigned int)(alloc_count - free_count), (long long unsigned int)(alloc_bytes - free_bytes)); +#ifdef ENABLE_ZLIB + uint64_t zlib_compression_count = __sync_fetch_and_add(&zlib_compressions, 0); + uint64_t zlib_decompression_count = __sync_fetch_and_add(&zlib_decompressions, 0); + uint64_t zlib_bytes_diff = __sync_fetch_and_add(&zlib_compression_diff, 0); + + syslog(LOG_DAEMON, + "MemoryProfiler (zLib): %llu compressions, %llu decompressions, %llu bytes difference", + (long long unsigned int)zlib_compression_count, + (long long unsigned int)zlib_decompression_count, + (long long unsigned int)zlib_bytes_diff); +#endif } #endif @@ -1686,8 +1882,10 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, case FLOW_EVENT_NOT_DETECTED: case FLOW_EVENT_GUESSED: - if (ndpi_dpi2json( - workflow->ndpi_struct, flow->ndpi_flow, flow->guessed_l7_protocol, &workflow->ndpi_serializer) != 0) + if (ndpi_dpi2json(workflow->ndpi_struct, + &flow->ndpi->flow, + flow->guessed_l7_protocol, + &workflow->ndpi_serializer) != 0) { syslog(LOG_DAEMON | LOG_ERR, "[%8llu, %4u] ndpi_dpi2json failed for not-detected/guessed flow", @@ -1699,7 +1897,7 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, case FLOW_EVENT_DETECTED: case FLOW_EVENT_DETECTION_UPDATE: if (ndpi_dpi2json(workflow->ndpi_struct, - flow->ndpi_flow, + &flow->ndpi->flow, flow->detected_l7_protocol, &workflow->ndpi_serializer) != 0) { @@ -2516,8 +2714,8 @@ static void ndpi_process_packet(uint8_t * const args, { /* flow still not found, must be new or midstream */ - union nDPId_ip const * netmask; - union nDPId_ip const * subnet; + union nDPId_ip const * netmask = NULL; + union nDPId_ip const * subnet = NULL; switch (flow_basic.l3_type) { case L3_IP: @@ -2617,18 +2815,17 @@ static void ndpi_process_packet(uint8_t * const args, return; } - memset(flow_to_process->ndpi_flow, - 0, - (SIZEOF_FLOW_STRUCT > sizeof(struct ndpi_flow_struct) ? SIZEOF_FLOW_STRUCT - : sizeof(struct ndpi_flow_struct))); + memset(flow_to_process->ndpi, 0, sizeof(*flow_to_process->ndpi)); - ndpi_src = flow_to_process->ndpi_src; - ndpi_dst = flow_to_process->ndpi_dst; + ndpi_src = &flow_to_process->ndpi->src; + ndpi_dst = &flow_to_process->ndpi->dst; is_new_flow = 1; } else { + /* flow already exists in the tree */ + struct nDPId_flow_basic * const flow_basic_to_process = *(struct nDPId_flow_basic **)tree_result; /* Update last seen timestamp for timeout handling. */ flow_basic_to_process->last_seen = time_ms; @@ -2651,15 +2848,30 @@ static void ndpi_process_packet(uint8_t * const args, } flow_to_process = (struct nDPId_flow_info *)flow_basic_to_process; +#ifdef ENABLE_ZLIB + if (nDPId_options.enable_zlib_compression != 0 && flow_to_process->ndpi_compressed_size > 0) + { + int ret = ndpi_data_inflate(flow_to_process); + if (ret <= 0) + { + syslog(LOG_DAEMON | LOG_ERR, + "zLib decompression failed for existing flow %u with error code: %d", + flow_to_process->flow_extended.flow_id, + ret); + return; + } + } +#endif + if (direction_changed != 0) { - ndpi_src = flow_to_process->ndpi_dst; - ndpi_dst = flow_to_process->ndpi_src; + ndpi_src = &flow_to_process->ndpi->dst; + ndpi_dst = &flow_to_process->ndpi->src; } else { - ndpi_src = flow_to_process->ndpi_src; - ndpi_dst = flow_to_process->ndpi_dst; + ndpi_src = &flow_to_process->ndpi->src; + ndpi_dst = &flow_to_process->ndpi->dst; } } @@ -2695,7 +2907,7 @@ static void ndpi_process_packet(uint8_t * const args, &flow_to_process->flow_extended, PACKET_EVENT_PAYLOAD_FLOW); - if (flow_to_process->ndpi_flow->num_processed_pkts == nDPId_options.max_packets_per_flow_to_process - 1) + if (flow_to_process->ndpi->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process - 1) { if (flow_to_process->detection_completed != 0) { @@ -2706,7 +2918,7 @@ static void ndpi_process_packet(uint8_t * const args, /* last chance to guess something, better then nothing */ uint8_t protocol_was_guessed = 0; flow_to_process->guessed_l7_protocol = - ndpi_detection_giveup(workflow->ndpi_struct, flow_to_process->ndpi_flow, 1, &protocol_was_guessed); + ndpi_detection_giveup(workflow->ndpi_struct, &flow_to_process->ndpi->flow, 1, &protocol_was_guessed); if (protocol_was_guessed != 0) { jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_GUESSED); @@ -2719,7 +2931,7 @@ static void ndpi_process_packet(uint8_t * const args, } flow_to_process->detected_l7_protocol = ndpi_detection_process_packet(workflow->ndpi_struct, - flow_to_process->ndpi_flow, + &flow_to_process->ndpi->flow, ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6, ip_size, time_ms, @@ -2732,11 +2944,11 @@ static void ndpi_process_packet(uint8_t * const args, flow_to_process->detection_completed = 1; workflow->detected_flow_protocols++; jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTED); - flow_to_process->last_ndpi_flow_struct_hash = calculate_ndpi_flow_struct_hash(flow_to_process->ndpi_flow); + flow_to_process->last_ndpi_flow_struct_hash = calculate_ndpi_flow_struct_hash(&flow_to_process->ndpi->flow); } else if (flow_to_process->detection_completed == 1) { - uint32_t hash = calculate_ndpi_flow_struct_hash(flow_to_process->ndpi_flow); + uint32_t hash = calculate_ndpi_flow_struct_hash(&flow_to_process->ndpi->flow); if (hash != flow_to_process->last_ndpi_flow_struct_hash) { jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTION_UPDATE); @@ -2744,11 +2956,24 @@ static void ndpi_process_packet(uint8_t * const args, } } - if (flow_to_process->ndpi_flow->num_processed_pkts == nDPId_options.max_packets_per_flow_to_process) + if (flow_to_process->ndpi->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process) { free_ndpi_structs(flow_to_process); flow_to_process->flow_extended.flow_basic.type = FT_FINISHED; } +#ifdef ENABLE_ZLIB + else if (nDPId_options.enable_zlib_compression != 0) + { + int ret = ndpi_data_deflate(flow_to_process); + if (ret <= 0) + { + syslog(LOG_DAEMON | LOG_ERR, + "zLib compression failed for flow %u with error code: %d", + flow_to_process->flow_extended.flow_id, + ret); + } + } +#endif } static void run_pcap_loop(struct nDPId_reader_thread const * const reader_thread) @@ -3120,11 +3345,14 @@ static int nDPId_parse_options(int argc, char ** argv) "\t \tThis value is required for correct flow handling of\n" "\t \tmultiple instances and should be unique.\n" "\t \tDefaults to your hostname.\n" +#ifdef ENABLE_ZLIB + "\t-z\tEnable flow memory zLib compression. (Experimental!)\n" +#endif "\t-o\t(Carefully) Tune some daemon options. See subopts below.\n" "\t-v\tversion\n" "\t-h\tthis\n\n"; - while ((opt = getopt(argc, argv, "hi:IEB:lc:dp:u:g:P:C:J:S:a:o:vh")) != -1) + while ((opt = getopt(argc, argv, "hi:IEB:lc:dp:u:g:P:C:J:S:a:zo:vh")) != -1) { switch (opt) { @@ -3182,6 +3410,14 @@ static int nDPId_parse_options(int argc, char ** argv) case 'a': nDPId_options.instance_alias = strdup(optarg); break; + case 'z': +#ifdef ENABLE_ZLIB + nDPId_options.enable_zlib_compression = 1; + break; +#else + fprintf(stderr, "nDPId was built w/o zLib compression\n"); + return 1; +#endif case 'o': { int errfnd = 0; @@ -3283,6 +3519,12 @@ static int validate_options(char const * const arg0) { int retval = 0; +#ifdef ENABLE_ZLIB + if (nDPId_options.enable_zlib_compression != 0) + { + fprintf(stderr, "%s: WARNING: zLib compression is an experimental feature! Expect random crashes.\n", arg0); + } +#endif if (is_path_absolute("JSON socket", nDPId_options.json_sockpath) != 0) { retval = 1; @@ -3423,7 +3665,7 @@ int main(int argc, char ** argv) openlog("nDPId", LOG_CONS | LOG_PERROR, LOG_DAEMON); #ifdef ENABLE_MEMORY_PROFILING - syslog(LOG_DAEMON, "size/processed-flow: %zu bytes\n", sizeof(struct nDPId_flow_info)); + syslog(LOG_DAEMON, "size/flow: %zu bytes\n", sizeof(struct nDPId_flow_info) + sizeof(struct nDPI_data)); #endif if (setup_reader_threads() != 0) |