aboutsummaryrefslogtreecommitdiff
path: root/nDPIsrvd.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2021-08-04 17:09:53 +0200
committerToni Uhlig <matzeton@googlemail.com>2021-08-04 17:19:15 +0200
commitd48508b4afe5f3a22c2dda733ee13554d5c5ae60 (patch)
tree049b84a167350236c52ce956a32eae824a092d43 /nDPIsrvd.c
parentf4c8d96dd93b7cdaafbb5d858268266ef4edb2ae (diff)
Improved nDPIsrvd buffer bloat handling using caching.
* still allow blocking mode (with send timeout) * improved daemon start/stop test script Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r--nDPIsrvd.c263
1 files changed, 221 insertions, 42 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c
index 0e86a84d5..781071fe2 100644
--- a/nDPIsrvd.c
+++ b/nDPIsrvd.c
@@ -29,6 +29,7 @@ struct remote_desc
enum sock_type sock_type;
int fd;
struct nDPIsrvd_buffer buf;
+ UT_array * buf_cache;
union {
struct
{
@@ -67,7 +68,150 @@ static struct
char * serv_optarg;
char * user;
char * group;
-} nDPIsrvd_options = {};
+ nDPIsrvd_ull cache_array_length;
+ int cache_fallback_to_blocking;
+} nDPIsrvd_options = {.cache_array_length = nDPIsrvd_CACHE_ARRAY_LENGTH, .cache_fallback_to_blocking = 1};
+
+static int fcntl_add_flags(int fd, int flags);
+static int fcntl_del_flags(int fd, int flags);
+static void disconnect_client(int epollfd, struct remote_desc * const current);
+static int add_in_event(int epollfd, int fd, void * ptr);
+static int add_out_event(int epollfd, int fd, void * ptr);
+static int del_event(int epollfd, int fd);
+static int drain_cache_blocking(struct remote_desc * const remote);
+
+static void nDPIsrvd_buffer_array_copy(void * dst, const void * src)
+{
+ struct nDPIsrvd_buffer * const buf_dst = (struct nDPIsrvd_buffer *)dst;
+ struct nDPIsrvd_buffer const * const buf_src = (struct nDPIsrvd_buffer *)src;
+
+ buf_dst->ptr.raw = NULL;
+ if (nDPIsrvd_buffer_init(buf_dst, buf_src->used) != 0)
+ {
+ return;
+ }
+
+ buf_dst->json_string_start = buf_src->json_string_start;
+ buf_dst->json_string_length = buf_src->used;
+ buf_dst->json_string = buf_dst->ptr.text + buf_dst->json_string_start;
+ buf_dst->used = buf_src->used;
+ memcpy(buf_dst->ptr.raw, buf_src->ptr.raw, buf_src->used);
+}
+
+static void nDPIsrvd_buffer_array_dtor(void * elt)
+{
+ struct nDPIsrvd_buffer * const buf = (struct nDPIsrvd_buffer *)elt;
+
+ nDPIsrvd_buffer_free(buf);
+}
+
+static const UT_icd nDPIsrvd_buffer_array_icd = {sizeof(struct nDPIsrvd_buffer),
+ NULL,
+ nDPIsrvd_buffer_array_copy,
+ nDPIsrvd_buffer_array_dtor};
+
+static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf, size_t siz)
+{
+ struct nDPIsrvd_buffer buf_src = {};
+
+ if (utarray_len(remote->buf_cache) >= nDPIsrvd_options.cache_array_length)
+ {
+ if (nDPIsrvd_options.cache_fallback_to_blocking == 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Buffer cache limit (%u lines) reached, remote too slow.",
+ utarray_len(remote->buf_cache));
+ return -1;
+ }
+ else
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Buffer cache limit (%u lines) reached, falling back to blocking I/O.",
+ utarray_len(remote->buf_cache));
+ if (drain_cache_blocking(remote) != 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Could not drain buffer cache in blocking I/O: %s", strerror(errno));
+ return -1;
+ }
+ }
+ }
+
+ buf_src.ptr.raw = buf;
+ buf_src.json_string_length = buf_src.used = siz;
+ utarray_push_back(remote->buf_cache, &buf_src);
+ remote->buf.used = 0;
+
+ return 0;
+}
+
+static int drain_cache(struct remote_desc * const remote)
+{
+ errno = 0;
+
+ while (utarray_len(remote->buf_cache) > 0)
+ {
+ struct nDPIsrvd_buffer * buf = (struct nDPIsrvd_buffer *)utarray_front(remote->buf_cache);
+ ssize_t written = write(remote->fd, buf->ptr.raw + buf->json_string_start, buf->json_string_length);
+ switch (written)
+ {
+ case -1:
+ if (errno == EAGAIN)
+ {
+ return 0;
+ }
+ return -1;
+ case 0:
+ return -1;
+ default:
+ buf->json_string_start += written;
+ buf->json_string_length -= written;
+ if (buf->json_string_length == 0)
+ {
+ utarray_erase(remote->buf_cache, 0, 1);
+ }
+ break;
+ }
+ }
+
+ return 0;
+}
+
+static int drain_cache_blocking(struct remote_desc * const remote)
+{
+ int retval = 0;
+
+ if (fcntl_del_flags(remote->fd, O_NONBLOCK) != 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error setting distributor fd flags: %s", strerror(errno));
+ return -1;
+ }
+ if (drain_cache(remote) != 0)
+ {
+ retval = -1;
+ }
+ if (fcntl_add_flags(remote->fd, O_NONBLOCK) != 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error setting distributor fd flags: %s", strerror(errno));
+ return -1;
+ }
+
+ return retval;
+}
+
+static int handle_outgoing_data(int epollfd, struct remote_desc * const remote)
+{
+ if (remote->sock_type != SERV_SOCK)
+ {
+ return -1;
+ }
+ if (drain_cache(remote) != 0)
+ {
+ return -1;
+ }
+ del_event(epollfd, remote->fd);
+
+ return 0;
+}
static int fcntl_add_flags(int fd, int flags)
{
@@ -87,7 +231,7 @@ static int fcntl_del_flags(int fd, int flags)
if (cur_flags == -1)
{
- return 1;
+ return -1;
}
return fcntl(fd, F_SETFL, cur_flags & ~flags);
@@ -103,7 +247,15 @@ static int create_listen_sockets(void)
return 1;
}
- int opt = 1;
+ int opt = NETWORK_BUFFER_MAX_SIZE * 16;
+ if (setsockopt(serv_sockfd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)) < 0 ||
+ setsockopt(json_sockfd, SOL_SOCKET, SO_RCVBUF, &opt, sizeof(opt)) < 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "setsockopt with SO_RCVBUF/SO_SNDBUF failed: %s", strerror(errno));
+ return 1;
+ }
+
+ opt = 1;
if (setsockopt(json_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0 ||
setsockopt(serv_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0)
{
@@ -174,7 +326,9 @@ static struct remote_desc * get_unused_remote_descriptor(enum sock_type type, in
if (remotes.desc[i].fd == -1)
{
remotes.desc_used++;
- if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, NETWORK_BUFFER_MAX_SIZE) != 0)
+ utarray_new(remotes.desc[i].buf_cache, &nDPIsrvd_buffer_array_icd);
+ if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, NETWORK_BUFFER_MAX_SIZE) != 0 ||
+ remotes.desc[i].buf_cache == NULL)
{
return NULL;
}
@@ -187,7 +341,7 @@ static struct remote_desc * get_unused_remote_descriptor(enum sock_type type, in
return NULL;
}
-static int add_event(int epollfd, int fd, void * ptr)
+static int add_event(int epollfd, int events, int fd, void * ptr)
{
struct epoll_event event = {};
@@ -199,10 +353,21 @@ static int add_event(int epollfd, int fd, void * ptr)
{
event.data.fd = fd;
}
- event.events = EPOLLIN;
+ event.events = events;
+
return epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
}
+static int add_in_event(int epollfd, int fd, void * ptr)
+{
+ return add_event(epollfd, EPOLLIN, fd, ptr);
+}
+
+static int add_out_event(int epollfd, int fd, void * ptr)
+{
+ return add_event(epollfd, EPOLLOUT, fd, ptr);
+}
+
static int del_event(int epollfd, int fd)
{
return epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL);
@@ -227,7 +392,7 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
{
int opt;
- while ((opt = getopt(argc, argv, "lc:dp:s:u:g:vh")) != -1)
+ while ((opt = getopt(argc, argv, "lc:dp:s:u:g:C:Dvh")) != -1)
{
switch (opt)
{
@@ -257,6 +422,16 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
free(nDPIsrvd_options.group);
nDPIsrvd_options.group = strdup(optarg);
break;
+ case 'C':
+ if (str_value_to_ull(optarg, &nDPIsrvd_options.cache_array_length) != CONVERSION_OK)
+ {
+ fprintf(stderr, "%s: Argument for `-C' is not a number: %s\n", argv[0], optarg);
+ return 1;
+ }
+ break;
+ case 'D':
+ nDPIsrvd_options.cache_fallback_to_blocking = 0;
+ break;
case 'v':
fprintf(stderr, "%s", get_nDPId_version());
return 1;
@@ -266,6 +441,7 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
fprintf(stderr,
"Usage: %s [-l] [-c path-to-unix-sock] [-d] [-p pidfile]\n"
"\t[-s path-to-unix-socket|distributor-host:port] [-u user] [-g group]\n"
+ "\t[-C cache-array-length] [-D]\n"
"\t[-v] [-h]\n",
argv[0]);
return 1;
@@ -363,7 +539,7 @@ static int new_connection(int epollfd, int eventfd)
return 1;
}
- struct remote_desc * current = accept_remote(server_fd, stype, (struct sockaddr *)&sockaddr, &peer_addr_len);
+ struct remote_desc * const current = accept_remote(server_fd, stype, (struct sockaddr *)&sockaddr, &peer_addr_len);
if (current == NULL)
{
return 1;
@@ -402,6 +578,13 @@ static int new_connection(int epollfd, int eventfd)
current->event_serv.peer_addr,
ntohs(current->event_serv.peer.sin_port));
}
+ {
+ struct timeval send_timeout = {1, 0};
+ if (setsockopt(current->fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&send_timeout, sizeof(send_timeout)) != 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error setting socket option send timeout: %s", strerror(errno));
+ }
+ }
break;
}
@@ -418,7 +601,7 @@ static int new_connection(int epollfd, int eventfd)
{
shutdown(current->fd, SHUT_WR); // collector
/* setup epoll event */
- if (add_event(epollfd, current->fd, current) != 0)
+ if (add_in_event(epollfd, current->fd, current) != 0)
{
disconnect_client(epollfd, current);
return 1;
@@ -499,7 +682,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
{
if (current->sock_type != JSON_SOCK)
{
- return 0;
+ return 1;
}
/* read JSON strings (or parts) from the UNIX socket (collecting) */
@@ -543,42 +726,31 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
{
continue;
}
- if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used)
+ if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used &&
+ utarray_len(remotes.desc[i].buf_cache) == 0)
{
- syslog(LOG_DAEMON | LOG_ERR,
- "Buffer capacity threshold (%zu of max %zu bytes) reached, "
- "falling back to blocking mode.",
- remotes.desc[i].buf.used,
- remotes.desc[i].buf.max);
- /*
- * FIXME: Maybe switch to a Multithreading distributor data transmission,
- * so that we do not have to switch back to blocking mode here!
- * NOTE: If *one* distributer peer is too slow, all other distributors are
- * affected by this. This causes starvation and leads to a possible data loss on
- * the nDPId collector side.
- */
- if (fcntl_del_flags(remotes.desc[i].fd, O_NONBLOCK) != 0)
+ syslog(LOG_DAEMON, "Buffer capacity threshold (%zu bytes) reached, caching.", remotes.desc[i].buf.used);
+ if (add_to_cache(&remotes.desc[i], remotes.desc[i].buf.ptr.raw, remotes.desc[i].buf.used) != 0)
{
- syslog(LOG_DAEMON | LOG_ERR, "Error setting distributor fd flags: %s", strerror(errno));
disconnect_client(epollfd, &remotes.desc[i]);
continue;
}
- if (write(remotes.desc[i].fd, remotes.desc[i].buf.ptr.raw, remotes.desc[i].buf.used) !=
- (ssize_t)remotes.desc[i].buf.used)
+ errno = 0;
+ if (add_out_event(epollfd, remotes.desc[i].fd, &remotes.desc[i]) != 0)
{
- syslog(LOG_DAEMON | LOG_ERR,
- "Could not drain buffer by %zu bytes. (forced)",
- remotes.desc[i].buf.used);
+ syslog(LOG_DAEMON | LOG_ERR, "%s: %s", "Could not add event, disconnecting", strerror(errno));
disconnect_client(epollfd, &remotes.desc[i]);
continue;
}
- remotes.desc[i].buf.used = 0;
- if (fcntl_add_flags(remotes.desc[i].fd, O_NONBLOCK) != 0)
+ }
+
+ if (utarray_len(remotes.desc[i].buf_cache) > 0)
+ {
+ if (add_to_cache(&remotes.desc[i], current->buf.ptr.raw, current->event_json.json_bytes) != 0)
{
- syslog(LOG_DAEMON | LOG_ERR, "Error setting distributor fd flags: %s", strerror(errno));
disconnect_client(epollfd, &remotes.desc[i]);
- continue;
}
+ continue;
}
memcpy(remotes.desc[i].buf.ptr.raw + remotes.desc[i].buf.used,
@@ -649,11 +821,11 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
return 0;
}
-static int handle_incoming_data_event(int epollfd, struct epoll_event * const event)
+static int handle_data_event(int epollfd, struct epoll_event * const event)
{
struct remote_desc * current = (struct remote_desc *)event->data.ptr;
- if ((event->events & EPOLLIN) == 0)
+ if ((event->events & EPOLLIN) == 0 && (event->events & EPOLLOUT) == 0)
{
return 1;
}
@@ -670,7 +842,14 @@ static int handle_incoming_data_event(int epollfd, struct epoll_event * const ev
return 1;
}
- return handle_incoming_data(epollfd, current);
+ if ((event->events & EPOLLIN) != 0)
+ {
+ return handle_incoming_data(epollfd, current);
+ }
+ else
+ {
+ return handle_outgoing_data(epollfd, current);
+ }
}
static int setup_signalfd(int epollfd)
@@ -693,7 +872,7 @@ static int setup_signalfd(int epollfd)
return -1;
}
- if (add_event(epollfd, sfd, NULL) != 0)
+ if (add_in_event(epollfd, sfd, NULL) != 0)
{
return -1;
}
@@ -762,8 +941,8 @@ static int mainloop(int epollfd)
}
else
{
- /* Incoming data. */
- if (handle_incoming_data_event(epollfd, &events[i]) != 0)
+ /* Incoming data / Outoing data ready to send. */
+ if (handle_data_event(epollfd, &events[i]) != 0)
{
continue;
}
@@ -790,13 +969,13 @@ static int setup_event_queue(void)
return -1;
}
- if (add_event(epollfd, json_sockfd, NULL) != 0)
+ if (add_in_event(epollfd, json_sockfd, NULL) != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Error adding JSON fd to epoll: %s", strerror(errno));
return -1;
}
- if (add_event(epollfd, serv_sockfd, NULL) != 0)
+ if (add_in_event(epollfd, serv_sockfd, NULL) != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Error adding SERV fd to epoll: %s", strerror(errno));
return -1;