diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2021-08-04 17:09:53 +0200 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2021-08-04 17:19:15 +0200 |
commit | d48508b4afe5f3a22c2dda733ee13554d5c5ae60 (patch) | |
tree | 049b84a167350236c52ce956a32eae824a092d43 /nDPIsrvd.c | |
parent | f4c8d96dd93b7cdaafbb5d858268266ef4edb2ae (diff) |
Improved nDPIsrvd buffer bloat handling using caching.
* still allow blocking mode (with send timeout)
* improved daemon start/stop test script
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r-- | nDPIsrvd.c | 263 |
1 files changed, 221 insertions, 42 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c index 0e86a84d5..781071fe2 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -29,6 +29,7 @@ struct remote_desc enum sock_type sock_type; int fd; struct nDPIsrvd_buffer buf; + UT_array * buf_cache; union { struct { @@ -67,7 +68,150 @@ static struct char * serv_optarg; char * user; char * group; -} nDPIsrvd_options = {}; + nDPIsrvd_ull cache_array_length; + int cache_fallback_to_blocking; +} nDPIsrvd_options = {.cache_array_length = nDPIsrvd_CACHE_ARRAY_LENGTH, .cache_fallback_to_blocking = 1}; + +static int fcntl_add_flags(int fd, int flags); +static int fcntl_del_flags(int fd, int flags); +static void disconnect_client(int epollfd, struct remote_desc * const current); +static int add_in_event(int epollfd, int fd, void * ptr); +static int add_out_event(int epollfd, int fd, void * ptr); +static int del_event(int epollfd, int fd); +static int drain_cache_blocking(struct remote_desc * const remote); + +static void nDPIsrvd_buffer_array_copy(void * dst, const void * src) +{ + struct nDPIsrvd_buffer * const buf_dst = (struct nDPIsrvd_buffer *)dst; + struct nDPIsrvd_buffer const * const buf_src = (struct nDPIsrvd_buffer *)src; + + buf_dst->ptr.raw = NULL; + if (nDPIsrvd_buffer_init(buf_dst, buf_src->used) != 0) + { + return; + } + + buf_dst->json_string_start = buf_src->json_string_start; + buf_dst->json_string_length = buf_src->used; + buf_dst->json_string = buf_dst->ptr.text + buf_dst->json_string_start; + buf_dst->used = buf_src->used; + memcpy(buf_dst->ptr.raw, buf_src->ptr.raw, buf_src->used); +} + +static void nDPIsrvd_buffer_array_dtor(void * elt) +{ + struct nDPIsrvd_buffer * const buf = (struct nDPIsrvd_buffer *)elt; + + nDPIsrvd_buffer_free(buf); +} + +static const UT_icd nDPIsrvd_buffer_array_icd = {sizeof(struct nDPIsrvd_buffer), + NULL, + nDPIsrvd_buffer_array_copy, + nDPIsrvd_buffer_array_dtor}; + +static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf, size_t siz) +{ + struct nDPIsrvd_buffer buf_src = {}; + + if (utarray_len(remote->buf_cache) >= nDPIsrvd_options.cache_array_length) + { + if (nDPIsrvd_options.cache_fallback_to_blocking == 0) + { + syslog(LOG_DAEMON | LOG_ERR, + "Buffer cache limit (%u lines) reached, remote too slow.", + utarray_len(remote->buf_cache)); + return -1; + } + else + { + syslog(LOG_DAEMON | LOG_ERR, + "Buffer cache limit (%u lines) reached, falling back to blocking I/O.", + utarray_len(remote->buf_cache)); + if (drain_cache_blocking(remote) != 0) + { + syslog(LOG_DAEMON | LOG_ERR, "Could not drain buffer cache in blocking I/O: %s", strerror(errno)); + return -1; + } + } + } + + buf_src.ptr.raw = buf; + buf_src.json_string_length = buf_src.used = siz; + utarray_push_back(remote->buf_cache, &buf_src); + remote->buf.used = 0; + + return 0; +} + +static int drain_cache(struct remote_desc * const remote) +{ + errno = 0; + + while (utarray_len(remote->buf_cache) > 0) + { + struct nDPIsrvd_buffer * buf = (struct nDPIsrvd_buffer *)utarray_front(remote->buf_cache); + ssize_t written = write(remote->fd, buf->ptr.raw + buf->json_string_start, buf->json_string_length); + switch (written) + { + case -1: + if (errno == EAGAIN) + { + return 0; + } + return -1; + case 0: + return -1; + default: + buf->json_string_start += written; + buf->json_string_length -= written; + if (buf->json_string_length == 0) + { + utarray_erase(remote->buf_cache, 0, 1); + } + break; + } + } + + return 0; +} + +static int drain_cache_blocking(struct remote_desc * const remote) +{ + int retval = 0; + + if (fcntl_del_flags(remote->fd, O_NONBLOCK) != 0) + { + syslog(LOG_DAEMON | LOG_ERR, "Error setting distributor fd flags: %s", strerror(errno)); + return -1; + } + if (drain_cache(remote) != 0) + { + retval = -1; + } + if (fcntl_add_flags(remote->fd, O_NONBLOCK) != 0) + { + syslog(LOG_DAEMON | LOG_ERR, "Error setting distributor fd flags: %s", strerror(errno)); + return -1; + } + + return retval; +} + +static int handle_outgoing_data(int epollfd, struct remote_desc * const remote) +{ + if (remote->sock_type != SERV_SOCK) + { + return -1; + } + if (drain_cache(remote) != 0) + { + return -1; + } + del_event(epollfd, remote->fd); + + return 0; +} static int fcntl_add_flags(int fd, int flags) { @@ -87,7 +231,7 @@ static int fcntl_del_flags(int fd, int flags) if (cur_flags == -1) { - return 1; + return -1; } return fcntl(fd, F_SETFL, cur_flags & ~flags); @@ -103,7 +247,15 @@ static int create_listen_sockets(void) return 1; } - int opt = 1; + int opt = NETWORK_BUFFER_MAX_SIZE * 16; + if (setsockopt(serv_sockfd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)) < 0 || + setsockopt(json_sockfd, SOL_SOCKET, SO_RCVBUF, &opt, sizeof(opt)) < 0) + { + syslog(LOG_DAEMON | LOG_ERR, "setsockopt with SO_RCVBUF/SO_SNDBUF failed: %s", strerror(errno)); + return 1; + } + + opt = 1; if (setsockopt(json_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0 || setsockopt(serv_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { @@ -174,7 +326,9 @@ static struct remote_desc * get_unused_remote_descriptor(enum sock_type type, in if (remotes.desc[i].fd == -1) { remotes.desc_used++; - if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, NETWORK_BUFFER_MAX_SIZE) != 0) + utarray_new(remotes.desc[i].buf_cache, &nDPIsrvd_buffer_array_icd); + if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, NETWORK_BUFFER_MAX_SIZE) != 0 || + remotes.desc[i].buf_cache == NULL) { return NULL; } @@ -187,7 +341,7 @@ static struct remote_desc * get_unused_remote_descriptor(enum sock_type type, in return NULL; } -static int add_event(int epollfd, int fd, void * ptr) +static int add_event(int epollfd, int events, int fd, void * ptr) { struct epoll_event event = {}; @@ -199,10 +353,21 @@ static int add_event(int epollfd, int fd, void * ptr) { event.data.fd = fd; } - event.events = EPOLLIN; + event.events = events; + return epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); } +static int add_in_event(int epollfd, int fd, void * ptr) +{ + return add_event(epollfd, EPOLLIN, fd, ptr); +} + +static int add_out_event(int epollfd, int fd, void * ptr) +{ + return add_event(epollfd, EPOLLOUT, fd, ptr); +} + static int del_event(int epollfd, int fd) { return epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL); @@ -227,7 +392,7 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) { int opt; - while ((opt = getopt(argc, argv, "lc:dp:s:u:g:vh")) != -1) + while ((opt = getopt(argc, argv, "lc:dp:s:u:g:C:Dvh")) != -1) { switch (opt) { @@ -257,6 +422,16 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) free(nDPIsrvd_options.group); nDPIsrvd_options.group = strdup(optarg); break; + case 'C': + if (str_value_to_ull(optarg, &nDPIsrvd_options.cache_array_length) != CONVERSION_OK) + { + fprintf(stderr, "%s: Argument for `-C' is not a number: %s\n", argv[0], optarg); + return 1; + } + break; + case 'D': + nDPIsrvd_options.cache_fallback_to_blocking = 0; + break; case 'v': fprintf(stderr, "%s", get_nDPId_version()); return 1; @@ -266,6 +441,7 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) fprintf(stderr, "Usage: %s [-l] [-c path-to-unix-sock] [-d] [-p pidfile]\n" "\t[-s path-to-unix-socket|distributor-host:port] [-u user] [-g group]\n" + "\t[-C cache-array-length] [-D]\n" "\t[-v] [-h]\n", argv[0]); return 1; @@ -363,7 +539,7 @@ static int new_connection(int epollfd, int eventfd) return 1; } - struct remote_desc * current = accept_remote(server_fd, stype, (struct sockaddr *)&sockaddr, &peer_addr_len); + struct remote_desc * const current = accept_remote(server_fd, stype, (struct sockaddr *)&sockaddr, &peer_addr_len); if (current == NULL) { return 1; @@ -402,6 +578,13 @@ static int new_connection(int epollfd, int eventfd) current->event_serv.peer_addr, ntohs(current->event_serv.peer.sin_port)); } + { + struct timeval send_timeout = {1, 0}; + if (setsockopt(current->fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&send_timeout, sizeof(send_timeout)) != 0) + { + syslog(LOG_DAEMON | LOG_ERR, "Error setting socket option send timeout: %s", strerror(errno)); + } + } break; } @@ -418,7 +601,7 @@ static int new_connection(int epollfd, int eventfd) { shutdown(current->fd, SHUT_WR); // collector /* setup epoll event */ - if (add_event(epollfd, current->fd, current) != 0) + if (add_in_event(epollfd, current->fd, current) != 0) { disconnect_client(epollfd, current); return 1; @@ -499,7 +682,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) { if (current->sock_type != JSON_SOCK) { - return 0; + return 1; } /* read JSON strings (or parts) from the UNIX socket (collecting) */ @@ -543,42 +726,31 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) { continue; } - if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used) + if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used && + utarray_len(remotes.desc[i].buf_cache) == 0) { - syslog(LOG_DAEMON | LOG_ERR, - "Buffer capacity threshold (%zu of max %zu bytes) reached, " - "falling back to blocking mode.", - remotes.desc[i].buf.used, - remotes.desc[i].buf.max); - /* - * FIXME: Maybe switch to a Multithreading distributor data transmission, - * so that we do not have to switch back to blocking mode here! - * NOTE: If *one* distributer peer is too slow, all other distributors are - * affected by this. This causes starvation and leads to a possible data loss on - * the nDPId collector side. - */ - if (fcntl_del_flags(remotes.desc[i].fd, O_NONBLOCK) != 0) + syslog(LOG_DAEMON, "Buffer capacity threshold (%zu bytes) reached, caching.", remotes.desc[i].buf.used); + if (add_to_cache(&remotes.desc[i], remotes.desc[i].buf.ptr.raw, remotes.desc[i].buf.used) != 0) { - syslog(LOG_DAEMON | LOG_ERR, "Error setting distributor fd flags: %s", strerror(errno)); disconnect_client(epollfd, &remotes.desc[i]); continue; } - if (write(remotes.desc[i].fd, remotes.desc[i].buf.ptr.raw, remotes.desc[i].buf.used) != - (ssize_t)remotes.desc[i].buf.used) + errno = 0; + if (add_out_event(epollfd, remotes.desc[i].fd, &remotes.desc[i]) != 0) { - syslog(LOG_DAEMON | LOG_ERR, - "Could not drain buffer by %zu bytes. (forced)", - remotes.desc[i].buf.used); + syslog(LOG_DAEMON | LOG_ERR, "%s: %s", "Could not add event, disconnecting", strerror(errno)); disconnect_client(epollfd, &remotes.desc[i]); continue; } - remotes.desc[i].buf.used = 0; - if (fcntl_add_flags(remotes.desc[i].fd, O_NONBLOCK) != 0) + } + + if (utarray_len(remotes.desc[i].buf_cache) > 0) + { + if (add_to_cache(&remotes.desc[i], current->buf.ptr.raw, current->event_json.json_bytes) != 0) { - syslog(LOG_DAEMON | LOG_ERR, "Error setting distributor fd flags: %s", strerror(errno)); disconnect_client(epollfd, &remotes.desc[i]); - continue; } + continue; } memcpy(remotes.desc[i].buf.ptr.raw + remotes.desc[i].buf.used, @@ -649,11 +821,11 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) return 0; } -static int handle_incoming_data_event(int epollfd, struct epoll_event * const event) +static int handle_data_event(int epollfd, struct epoll_event * const event) { struct remote_desc * current = (struct remote_desc *)event->data.ptr; - if ((event->events & EPOLLIN) == 0) + if ((event->events & EPOLLIN) == 0 && (event->events & EPOLLOUT) == 0) { return 1; } @@ -670,7 +842,14 @@ static int handle_incoming_data_event(int epollfd, struct epoll_event * const ev return 1; } - return handle_incoming_data(epollfd, current); + if ((event->events & EPOLLIN) != 0) + { + return handle_incoming_data(epollfd, current); + } + else + { + return handle_outgoing_data(epollfd, current); + } } static int setup_signalfd(int epollfd) @@ -693,7 +872,7 @@ static int setup_signalfd(int epollfd) return -1; } - if (add_event(epollfd, sfd, NULL) != 0) + if (add_in_event(epollfd, sfd, NULL) != 0) { return -1; } @@ -762,8 +941,8 @@ static int mainloop(int epollfd) } else { - /* Incoming data. */ - if (handle_incoming_data_event(epollfd, &events[i]) != 0) + /* Incoming data / Outoing data ready to send. */ + if (handle_data_event(epollfd, &events[i]) != 0) { continue; } @@ -790,13 +969,13 @@ static int setup_event_queue(void) return -1; } - if (add_event(epollfd, json_sockfd, NULL) != 0) + if (add_in_event(epollfd, json_sockfd, NULL) != 0) { syslog(LOG_DAEMON | LOG_ERR, "Error adding JSON fd to epoll: %s", strerror(errno)); return -1; } - if (add_event(epollfd, serv_sockfd, NULL) != 0) + if (add_in_event(epollfd, serv_sockfd, NULL) != 0) { syslog(LOG_DAEMON | LOG_ERR, "Error adding SERV fd to epoll: %s", strerror(errno)); return -1; |