diff options
-rw-r--r-- | CMakeLists.txt | 8 | ||||
-rw-r--r-- | TODO.md | 7 | ||||
-rw-r--r-- | config.h | 1 | ||||
-rw-r--r-- | dependencies/nDPIsrvd.h | 1 | ||||
-rw-r--r-- | nDPId-test.c | 8 | ||||
-rw-r--r-- | nDPIsrvd.c | 263 | ||||
-rwxr-xr-x | scripts/daemon.sh | 8 |
7 files changed, 242 insertions, 54 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index f740432f9..b83c2ac44 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -181,7 +181,9 @@ endif() find_package(PCAP "1.8.1" REQUIRED) target_compile_options(nDPId PRIVATE "-pthread") target_compile_definitions(nDPId PRIVATE -DGIT_VERSION=\"${GIT_VERSION}\" ${NDPID_DEFS} ${ZLIB_DEFS}) -target_include_directories(nDPId PRIVATE "${STATIC_LIBNDPI_INC}" "${NDPI_INCLUDEDIR}" "${NDPI_INCLUDEDIR}/ndpi") +target_include_directories(nDPId PRIVATE + "${STATIC_LIBNDPI_INC}" "${NDPI_INCLUDEDIR}" "${NDPI_INCLUDEDIR}/ndpi" + "${CMAKE_SOURCE_DIR}/dependencies/uthash/src") target_link_libraries(nDPId "${STATIC_LIBNDPI_LIB}" "${pkgcfg_lib_NDPI_ndpi}" "${pkgcfg_lib_PCRE_pcre}" "${pkgcfg_lib_MAXMINDDB_maxminddb}" "${pkgcfg_lib_ZLIB_z}" "${GCRYPT_LIBRARY}" "${GCRYPT_ERROR_LIBRARY}" "${PCAP_LIBRARY}" "${LIBM_LIB}" @@ -201,7 +203,9 @@ target_include_directories(nDPId-test PRIVATE "${CMAKE_SOURCE_DIR}/dependencies/uthash/src") target_compile_options(nDPId-test PRIVATE "-Wno-unused-function" "-pthread") target_compile_definitions(nDPId-test PRIVATE ${NDPID_DEFS}) -target_include_directories(nDPId-test PRIVATE "${STATIC_LIBNDPI_INC}" "${NDPI_INCLUDEDIR}" "${NDPI_INCLUDEDIR}/ndpi") +target_include_directories(nDPId-test PRIVATE + "${STATIC_LIBNDPI_INC}" "${NDPI_INCLUDEDIR}" "${NDPI_INCLUDEDIR}/ndpi" + "${CMAKE_SOURCE_DIR}/dependencies/uthash/src") target_compile_definitions(nDPId-test PRIVATE "-D_GNU_SOURCE=1" "-DNO_MAIN=1" "-Dsyslog=mock_syslog_stderr" ${NDPID_TEST_MPROF_DEFS}) target_link_libraries(nDPId-test "${STATIC_LIBNDPI_LIB}" "${pkgcfg_lib_NDPI_ndpi}" "${pkgcfg_lib_PCRE_pcre}" "${pkgcfg_lib_MAXMINDDB_maxminddb}" @@ -1,6 +1,5 @@ # TODOs -1. improve nDPIsrvd buffer bloat handling (Do not fall back to blocking mode!) -2. improve UDP/TCP timeout handling by reading netfilter conntrack timeouts from /proc (or just read conntrack table entries) -3. detect interface / timeout changes and apply them to nDPId -4. implement AEAD crypto via libsodium (at least for TCP communication) +1. improve UDP/TCP timeout handling by reading netfilter conntrack timeouts from /proc (or just read conntrack table entries) +2. detect interface / timeout changes and apply them to nDPId +3. implement AEAD crypto via libsodium (at least for TCP communication) @@ -37,5 +37,6 @@ /* nDPIsrvd default config options */ #define nDPIsrvd_PIDFILE "/tmp/ndpisrvd.pid" +#define nDPIsrvd_CACHE_ARRAY_LENGTH 256 #endif diff --git a/dependencies/nDPIsrvd.h b/dependencies/nDPIsrvd.h index 51dc4750f..29523e24a 100644 --- a/dependencies/nDPIsrvd.h +++ b/dependencies/nDPIsrvd.h @@ -319,6 +319,7 @@ static inline int nDPIsrvd_buffer_init(struct nDPIsrvd_buffer * const buffer, si static inline void nDPIsrvd_buffer_free(struct nDPIsrvd_buffer * const buffer) { free(buffer->ptr.raw); + buffer->ptr.raw = NULL; } static inline struct nDPIsrvd_socket * nDPIsrvd_init(size_t global_user_data_size, diff --git a/nDPId-test.c b/nDPId-test.c index 06512b1ff..db9296864 100644 --- a/nDPId-test.c +++ b/nDPId-test.c @@ -103,12 +103,12 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) strncpy(mock_serv_desc->event_serv.peer_addr, "0.0.0.0", sizeof(mock_serv_desc->event_serv.peer_addr)); mock_serv_desc->event_serv.peer.sin_port = 0; - if (add_event(epollfd, mock_pipefds[PIPE_nDPIsrvd], mock_json_desc) != 0) + if (add_in_event(epollfd, mock_pipefds[PIPE_nDPIsrvd], mock_json_desc) != 0) { THREAD_ERROR_GOTO(arg); } - if (add_event(epollfd, mock_servfds[PIPE_WRITE], mock_serv_desc) != 0) + if (add_in_event(epollfd, mock_servfds[PIPE_WRITE], mock_serv_desc) != 0) { THREAD_ERROR_GOTO(arg); } @@ -126,7 +126,7 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) { if (events[i].data.ptr == mock_json_desc) { - if (handle_incoming_data_event(epollfd, &events[i]) != 0) + if (handle_data_event(epollfd, &events[i]) != 0) { goto error; } @@ -183,7 +183,7 @@ static void * distributor_client_mainloop_thread(void * const arg) { THREAD_ERROR_GOTO(arg); } - if (add_event(dis_epollfd, mock_servfds[PIPE_READ], NULL) != 0) + if (add_in_event(dis_epollfd, mock_servfds[PIPE_READ], NULL) != 0) { THREAD_ERROR_GOTO(arg); } diff --git a/nDPIsrvd.c b/nDPIsrvd.c index 0e86a84d5..781071fe2 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -29,6 +29,7 @@ struct remote_desc enum sock_type sock_type; int fd; struct nDPIsrvd_buffer buf; + UT_array * buf_cache; union { struct { @@ -67,7 +68,150 @@ static struct char * serv_optarg; char * user; char * group; -} nDPIsrvd_options = {}; + nDPIsrvd_ull cache_array_length; + int cache_fallback_to_blocking; +} nDPIsrvd_options = {.cache_array_length = nDPIsrvd_CACHE_ARRAY_LENGTH, .cache_fallback_to_blocking = 1}; + +static int fcntl_add_flags(int fd, int flags); +static int fcntl_del_flags(int fd, int flags); +static void disconnect_client(int epollfd, struct remote_desc * const current); +static int add_in_event(int epollfd, int fd, void * ptr); +static int add_out_event(int epollfd, int fd, void * ptr); +static int del_event(int epollfd, int fd); +static int drain_cache_blocking(struct remote_desc * const remote); + +static void nDPIsrvd_buffer_array_copy(void * dst, const void * src) +{ + struct nDPIsrvd_buffer * const buf_dst = (struct nDPIsrvd_buffer *)dst; + struct nDPIsrvd_buffer const * const buf_src = (struct nDPIsrvd_buffer *)src; + + buf_dst->ptr.raw = NULL; + if (nDPIsrvd_buffer_init(buf_dst, buf_src->used) != 0) + { + return; + } + + buf_dst->json_string_start = buf_src->json_string_start; + buf_dst->json_string_length = buf_src->used; + buf_dst->json_string = buf_dst->ptr.text + buf_dst->json_string_start; + buf_dst->used = buf_src->used; + memcpy(buf_dst->ptr.raw, buf_src->ptr.raw, buf_src->used); +} + +static void nDPIsrvd_buffer_array_dtor(void * elt) +{ + struct nDPIsrvd_buffer * const buf = (struct nDPIsrvd_buffer *)elt; + + nDPIsrvd_buffer_free(buf); +} + +static const UT_icd nDPIsrvd_buffer_array_icd = {sizeof(struct nDPIsrvd_buffer), + NULL, + nDPIsrvd_buffer_array_copy, + nDPIsrvd_buffer_array_dtor}; + +static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf, size_t siz) +{ + struct nDPIsrvd_buffer buf_src = {}; + + if (utarray_len(remote->buf_cache) >= nDPIsrvd_options.cache_array_length) + { + if (nDPIsrvd_options.cache_fallback_to_blocking == 0) + { + syslog(LOG_DAEMON | LOG_ERR, + "Buffer cache limit (%u lines) reached, remote too slow.", + utarray_len(remote->buf_cache)); + return -1; + } + else + { + syslog(LOG_DAEMON | LOG_ERR, + "Buffer cache limit (%u lines) reached, falling back to blocking I/O.", + utarray_len(remote->buf_cache)); + if (drain_cache_blocking(remote) != 0) + { + syslog(LOG_DAEMON | LOG_ERR, "Could not drain buffer cache in blocking I/O: %s", strerror(errno)); + return -1; + } + } + } + + buf_src.ptr.raw = buf; + buf_src.json_string_length = buf_src.used = siz; + utarray_push_back(remote->buf_cache, &buf_src); + remote->buf.used = 0; + + return 0; +} + +static int drain_cache(struct remote_desc * const remote) +{ + errno = 0; + + while (utarray_len(remote->buf_cache) > 0) + { + struct nDPIsrvd_buffer * buf = (struct nDPIsrvd_buffer *)utarray_front(remote->buf_cache); + ssize_t written = write(remote->fd, buf->ptr.raw + buf->json_string_start, buf->json_string_length); + switch (written) + { + case -1: + if (errno == EAGAIN) + { + return 0; + } + return -1; + case 0: + return -1; + default: + buf->json_string_start += written; + buf->json_string_length -= written; + if (buf->json_string_length == 0) + { + utarray_erase(remote->buf_cache, 0, 1); + } + break; + } + } + + return 0; +} + +static int drain_cache_blocking(struct remote_desc * const remote) +{ + int retval = 0; + + if (fcntl_del_flags(remote->fd, O_NONBLOCK) != 0) + { + syslog(LOG_DAEMON | LOG_ERR, "Error setting distributor fd flags: %s", strerror(errno)); + return -1; + } + if (drain_cache(remote) != 0) + { + retval = -1; + } + if (fcntl_add_flags(remote->fd, O_NONBLOCK) != 0) + { + syslog(LOG_DAEMON | LOG_ERR, "Error setting distributor fd flags: %s", strerror(errno)); + return -1; + } + + return retval; +} + +static int handle_outgoing_data(int epollfd, struct remote_desc * const remote) +{ + if (remote->sock_type != SERV_SOCK) + { + return -1; + } + if (drain_cache(remote) != 0) + { + return -1; + } + del_event(epollfd, remote->fd); + + return 0; +} static int fcntl_add_flags(int fd, int flags) { @@ -87,7 +231,7 @@ static int fcntl_del_flags(int fd, int flags) if (cur_flags == -1) { - return 1; + return -1; } return fcntl(fd, F_SETFL, cur_flags & ~flags); @@ -103,7 +247,15 @@ static int create_listen_sockets(void) return 1; } - int opt = 1; + int opt = NETWORK_BUFFER_MAX_SIZE * 16; + if (setsockopt(serv_sockfd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)) < 0 || + setsockopt(json_sockfd, SOL_SOCKET, SO_RCVBUF, &opt, sizeof(opt)) < 0) + { + syslog(LOG_DAEMON | LOG_ERR, "setsockopt with SO_RCVBUF/SO_SNDBUF failed: %s", strerror(errno)); + return 1; + } + + opt = 1; if (setsockopt(json_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0 || setsockopt(serv_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { @@ -174,7 +326,9 @@ static struct remote_desc * get_unused_remote_descriptor(enum sock_type type, in if (remotes.desc[i].fd == -1) { remotes.desc_used++; - if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, NETWORK_BUFFER_MAX_SIZE) != 0) + utarray_new(remotes.desc[i].buf_cache, &nDPIsrvd_buffer_array_icd); + if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, NETWORK_BUFFER_MAX_SIZE) != 0 || + remotes.desc[i].buf_cache == NULL) { return NULL; } @@ -187,7 +341,7 @@ static struct remote_desc * get_unused_remote_descriptor(enum sock_type type, in return NULL; } -static int add_event(int epollfd, int fd, void * ptr) +static int add_event(int epollfd, int events, int fd, void * ptr) { struct epoll_event event = {}; @@ -199,10 +353,21 @@ static int add_event(int epollfd, int fd, void * ptr) { event.data.fd = fd; } - event.events = EPOLLIN; + event.events = events; + return epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); } +static int add_in_event(int epollfd, int fd, void * ptr) +{ + return add_event(epollfd, EPOLLIN, fd, ptr); +} + +static int add_out_event(int epollfd, int fd, void * ptr) +{ + return add_event(epollfd, EPOLLOUT, fd, ptr); +} + static int del_event(int epollfd, int fd) { return epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL); @@ -227,7 +392,7 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) { int opt; - while ((opt = getopt(argc, argv, "lc:dp:s:u:g:vh")) != -1) + while ((opt = getopt(argc, argv, "lc:dp:s:u:g:C:Dvh")) != -1) { switch (opt) { @@ -257,6 +422,16 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) free(nDPIsrvd_options.group); nDPIsrvd_options.group = strdup(optarg); break; + case 'C': + if (str_value_to_ull(optarg, &nDPIsrvd_options.cache_array_length) != CONVERSION_OK) + { + fprintf(stderr, "%s: Argument for `-C' is not a number: %s\n", argv[0], optarg); + return 1; + } + break; + case 'D': + nDPIsrvd_options.cache_fallback_to_blocking = 0; + break; case 'v': fprintf(stderr, "%s", get_nDPId_version()); return 1; @@ -266,6 +441,7 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) fprintf(stderr, "Usage: %s [-l] [-c path-to-unix-sock] [-d] [-p pidfile]\n" "\t[-s path-to-unix-socket|distributor-host:port] [-u user] [-g group]\n" + "\t[-C cache-array-length] [-D]\n" "\t[-v] [-h]\n", argv[0]); return 1; @@ -363,7 +539,7 @@ static int new_connection(int epollfd, int eventfd) return 1; } - struct remote_desc * current = accept_remote(server_fd, stype, (struct sockaddr *)&sockaddr, &peer_addr_len); + struct remote_desc * const current = accept_remote(server_fd, stype, (struct sockaddr *)&sockaddr, &peer_addr_len); if (current == NULL) { return 1; @@ -402,6 +578,13 @@ static int new_connection(int epollfd, int eventfd) current->event_serv.peer_addr, ntohs(current->event_serv.peer.sin_port)); } + { + struct timeval send_timeout = {1, 0}; + if (setsockopt(current->fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&send_timeout, sizeof(send_timeout)) != 0) + { + syslog(LOG_DAEMON | LOG_ERR, "Error setting socket option send timeout: %s", strerror(errno)); + } + } break; } @@ -418,7 +601,7 @@ static int new_connection(int epollfd, int eventfd) { shutdown(current->fd, SHUT_WR); // collector /* setup epoll event */ - if (add_event(epollfd, current->fd, current) != 0) + if (add_in_event(epollfd, current->fd, current) != 0) { disconnect_client(epollfd, current); return 1; @@ -499,7 +682,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) { if (current->sock_type != JSON_SOCK) { - return 0; + return 1; } /* read JSON strings (or parts) from the UNIX socket (collecting) */ @@ -543,42 +726,31 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) { continue; } - if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used) + if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used && + utarray_len(remotes.desc[i].buf_cache) == 0) { - syslog(LOG_DAEMON | LOG_ERR, - "Buffer capacity threshold (%zu of max %zu bytes) reached, " - "falling back to blocking mode.", - remotes.desc[i].buf.used, - remotes.desc[i].buf.max); - /* - * FIXME: Maybe switch to a Multithreading distributor data transmission, - * so that we do not have to switch back to blocking mode here! - * NOTE: If *one* distributer peer is too slow, all other distributors are - * affected by this. This causes starvation and leads to a possible data loss on - * the nDPId collector side. - */ - if (fcntl_del_flags(remotes.desc[i].fd, O_NONBLOCK) != 0) + syslog(LOG_DAEMON, "Buffer capacity threshold (%zu bytes) reached, caching.", remotes.desc[i].buf.used); + if (add_to_cache(&remotes.desc[i], remotes.desc[i].buf.ptr.raw, remotes.desc[i].buf.used) != 0) { - syslog(LOG_DAEMON | LOG_ERR, "Error setting distributor fd flags: %s", strerror(errno)); disconnect_client(epollfd, &remotes.desc[i]); continue; } - if (write(remotes.desc[i].fd, remotes.desc[i].buf.ptr.raw, remotes.desc[i].buf.used) != - (ssize_t)remotes.desc[i].buf.used) + errno = 0; + if (add_out_event(epollfd, remotes.desc[i].fd, &remotes.desc[i]) != 0) { - syslog(LOG_DAEMON | LOG_ERR, - "Could not drain buffer by %zu bytes. (forced)", - remotes.desc[i].buf.used); + syslog(LOG_DAEMON | LOG_ERR, "%s: %s", "Could not add event, disconnecting", strerror(errno)); disconnect_client(epollfd, &remotes.desc[i]); continue; } - remotes.desc[i].buf.used = 0; - if (fcntl_add_flags(remotes.desc[i].fd, O_NONBLOCK) != 0) + } + + if (utarray_len(remotes.desc[i].buf_cache) > 0) + { + if (add_to_cache(&remotes.desc[i], current->buf.ptr.raw, current->event_json.json_bytes) != 0) { - syslog(LOG_DAEMON | LOG_ERR, "Error setting distributor fd flags: %s", strerror(errno)); disconnect_client(epollfd, &remotes.desc[i]); - continue; } + continue; } memcpy(remotes.desc[i].buf.ptr.raw + remotes.desc[i].buf.used, @@ -649,11 +821,11 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) return 0; } -static int handle_incoming_data_event(int epollfd, struct epoll_event * const event) +static int handle_data_event(int epollfd, struct epoll_event * const event) { struct remote_desc * current = (struct remote_desc *)event->data.ptr; - if ((event->events & EPOLLIN) == 0) + if ((event->events & EPOLLIN) == 0 && (event->events & EPOLLOUT) == 0) { return 1; } @@ -670,7 +842,14 @@ static int handle_incoming_data_event(int epollfd, struct epoll_event * const ev return 1; } - return handle_incoming_data(epollfd, current); + if ((event->events & EPOLLIN) != 0) + { + return handle_incoming_data(epollfd, current); + } + else + { + return handle_outgoing_data(epollfd, current); + } } static int setup_signalfd(int epollfd) @@ -693,7 +872,7 @@ static int setup_signalfd(int epollfd) return -1; } - if (add_event(epollfd, sfd, NULL) != 0) + if (add_in_event(epollfd, sfd, NULL) != 0) { return -1; } @@ -762,8 +941,8 @@ static int mainloop(int epollfd) } else { - /* Incoming data. */ - if (handle_incoming_data_event(epollfd, &events[i]) != 0) + /* Incoming data / Outoing data ready to send. */ + if (handle_data_event(epollfd, &events[i]) != 0) { continue; } @@ -790,13 +969,13 @@ static int setup_event_queue(void) return -1; } - if (add_event(epollfd, json_sockfd, NULL) != 0) + if (add_in_event(epollfd, json_sockfd, NULL) != 0) { syslog(LOG_DAEMON | LOG_ERR, "Error adding JSON fd to epoll: %s", strerror(errno)); return -1; } - if (add_event(epollfd, serv_sockfd, NULL) != 0) + if (add_in_event(epollfd, serv_sockfd, NULL) != 0) { syslog(LOG_DAEMON | LOG_ERR, "Error adding SERV fd to epoll: %s", strerror(errno)); return -1; diff --git a/scripts/daemon.sh b/scripts/daemon.sh index 86e4d5aea..2cd7b4389 100755 --- a/scripts/daemon.sh +++ b/scripts/daemon.sh @@ -18,13 +18,17 @@ if [ -r "/tmp/nDPId-${NSUFFIX}.pid" -o -r "/tmp/nDPIsrvd-${NSUFFIX}.pid" ]; then nDPIsrvd_PID="$(cat "/tmp/nDPIsrvd-${NSUFFIX}.pid" 2>/dev/null)" if [ x"${nDPId_PID}" != x ]; then - sudo kill "${nDPId_PID}" + sudo kill "${nDPId_PID}" 2>/dev/null || true + while ps -p "${nDPId_PID}" > /dev/null; do sleep 1; done + rm -f "/tmp/nDPId-${NSUFFIX}.pid" else printf '%s\n' "${1} not started .." >&2 fi if [ x"${nDPIsrvd_PID}" != x ]; then - kill "${nDPIsrvd_PID}" + kill "${nDPIsrvd_PID}" 2>/dev/null || true + while ps -p "${nDPIsrvd_PID}" > /dev/null; do sleep 1; done + rm -f "/tmp/nDPIsrvd-${NSUFFIX}.pid" "/tmp/nDPIsrvd-${NSUFFIX}-collector.sock" "/tmp/nDPIsrvd-${NSUFFIX}-distributor.sock" else printf '%s\n' "${2} not started .." >&2 fi |