diff options
-rw-r--r-- | CMakeLists.txt | 3 | ||||
-rw-r--r-- | nDPId.c | 58 |
2 files changed, 44 insertions, 17 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 267b03593..788834ed6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -232,8 +232,7 @@ find_package(PCAP "1.8.1" REQUIRED) target_compile_options(nDPId PRIVATE "-pthread") target_compile_definitions(nDPId PRIVATE -DGIT_VERSION=\"${GIT_VERSION}\" ${NDPID_DEFS} ${ZLIB_DEFS}) target_include_directories(nDPId PRIVATE - "${STATIC_LIBNDPI_INC}" "${NDPI_INCLUDEDIR}" "${NDPI_INCLUDEDIR}/ndpi" - "${CMAKE_SOURCE_DIR}/dependencies/uthash/src") + "${STATIC_LIBNDPI_INC}" "${NDPI_INCLUDEDIR}" "${NDPI_INCLUDEDIR}/ndpi") target_link_libraries(nDPId "${STATIC_LIBNDPI_LIB}" "${pkgcfg_lib_NDPI_ndpi}" "${pkgcfg_lib_PCRE_pcre}" "${pkgcfg_lib_MAXMINDDB_maxminddb}" "${pkgcfg_lib_ZLIB_z}" "${GCRYPT_LIBRARY}" "${GCRYPT_ERROR_LIBRARY}" "${PCAP_LIBRARY}" "${LIBM_LIB}" @@ -39,6 +39,10 @@ #error "Compare and Swap aka __sync_fetch_and_add not available on your platform!" #endif +#if nDPId_MAX_READER_THREADS < 0 +#error "Invalid value for nDPId_MAX_READER_THREADS" +#endif + enum nDPId_l3_type { L3_IP, @@ -1039,6 +1043,7 @@ static size_t log_flows_to_str(struct nDPId_flow_basic const * flows[nDPId_MAX_F { case FT_UNKNOWN: case FT_SKIPPED: + written = 0; break; case FT_FINISHED: @@ -1046,11 +1051,14 @@ static size_t log_flows_to_str(struct nDPId_flow_basic const * flows[nDPId_MAX_F struct nDPId_flow_finished const * const flow_finished = (struct nDPId_flow_finished const *)flow_basic; #if 1 - written = snprintf(output + output_used, BUFSIZ, "%u,", flow_finished->flow_info.flow_extended.flow_id); + written = snprintf(output + output_used, + BUFSIZ - output_used, + "%u,", + flow_finished->flow_info.flow_extended.flow_id); #else written = snprintf(output + output_used, - BUFSIZ, + BUFSIZ - output_used, "[%u, %u, %llu],", flow_finished->flow_info.flow_extended.flow_id, flow_finished->flow_info.flow_extended.flow_basic.l4_protocol, @@ -1064,10 +1072,10 @@ static size_t log_flows_to_str(struct nDPId_flow_basic const * flows[nDPId_MAX_F struct nDPId_flow_info const * const flow_info = (struct nDPId_flow_info const *)flow_basic; #if 1 - written = snprintf(output + output_used, BUFSIZ, "%u,", flow_info->flow_extended.flow_id); + written = snprintf(output + output_used, BUFSIZ - output_used, "%u,", flow_info->flow_extended.flow_id); #else written = snprintf(output + output_used, - BUFSIZ, + BUFSIZ - output_used, "[%u, %u, %llu],", flow_info->flow_extended.flow_id, flow_info->flow_extended.flow_basic.l4_protocol, @@ -1081,7 +1089,14 @@ static size_t log_flows_to_str(struct nDPId_flow_basic const * flows[nDPId_MAX_F { break; } - output_used += written; + else + { + output_used += written; + if (output_used >= BUFSIZ) + { + break; + } + } } return (output_used > 0 ? output_used - 1 : 0); @@ -1899,14 +1914,11 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread, } errno = 0; - if (reader_thread->json_sock_reconnect == 0 && write(reader_thread->json_sockfd, newline_json_str, s_ret) != s_ret) + ssize_t written; + if (reader_thread->json_sock_reconnect == 0 && + (written = write(reader_thread->json_sockfd, newline_json_str, s_ret)) != s_ret) { saved_errno = errno; - syslog(LOG_DAEMON | LOG_ERR, - "[%8llu, %d] Send data to nDPIsrvd Collector failed: %s", - workflow->packets_captured, - reader_thread->array_index, - strerror(saved_errno)); if (saved_errno == EPIPE) { syslog(LOG_DAEMON | LOG_ERR, @@ -1920,10 +1932,26 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread, } else { - syslog(LOG_DAEMON | LOG_ERR, - "[%8llu, %d] Possible data loss detected", - workflow->packets_captured, - reader_thread->array_index); + fcntl(reader_thread->json_sockfd, F_SETFL, fcntl(reader_thread->json_sockfd, F_GETFL, 0) & ~O_NONBLOCK); + off_t pos = (written < 0 ? 0 : written); + while ((written = write(reader_thread->json_sockfd, newline_json_str + pos, s_ret - pos)) != s_ret - pos) + { + if (written < 0) + { + syslog(LOG_DAEMON | LOG_ERR, + "[%8llu, %d] Send data (blocking I/O) to nDPIsrvd Collector failed: %s", + workflow->packets_captured, + reader_thread->array_index, + strerror(saved_errno)); + reader_thread->json_sock_reconnect = 1; + break; + } + else + { + pos += written; + } + } + fcntl(reader_thread->json_sockfd, F_SETFL, fcntl(reader_thread->json_sockfd, F_GETFL, 0) & O_NONBLOCK); } } } |