aboutsummaryrefslogtreecommitdiff
path: root/nDPIsrvd.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2021-08-05 01:14:04 +0200
committerToni Uhlig <matzeton@googlemail.com>2021-08-05 02:02:51 +0200
commit6faded3cc7084cb898773dafc1ca9422242f9c81 (patch)
tree7beae11228ee671362af1e88f397bd80ead59e3b /nDPIsrvd.c
parentd48508b4afe5f3a22c2dda733ee13554d5c5ae60 (diff)
Improved and Fixed another buffering issue caused by removing an outgoing fd too early from epoll queue (EPOLLOUT).
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r--nDPIsrvd.c186
1 files changed, 105 insertions, 81 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c
index 781071fe2..a63a410f1 100644
--- a/nDPIsrvd.c
+++ b/nDPIsrvd.c
@@ -126,7 +126,7 @@ static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf,
else
{
syslog(LOG_DAEMON | LOG_ERR,
- "Buffer cache limit (%u lines) reached, falling back to blocking I/O.",
+ "Buffer JSON string cache limit (%u lines) reached, falling back to blocking I/O.",
utarray_len(remote->buf_cache));
if (drain_cache_blocking(remote) != 0)
{
@@ -144,10 +144,79 @@ static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf,
return 0;
}
+static int drain_main_buffer(struct remote_desc * const remote)
+{
+ if (remote->buf.used == 0)
+ {
+ return 0;
+ }
+
+ errno = 0;
+ ssize_t bytes_written = write(remote->fd, remote->buf.ptr.raw, remote->buf.used);
+ if (errno == EAGAIN)
+ {
+ return 0;
+ }
+ if (bytes_written < 0 || errno != 0)
+ {
+ if (remote->event_serv.peer_addr[0] == '\0')
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Distributor connection closed, send failed: %s", strerror(errno));
+ }
+ else
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Distributor connection to %.*s:%u closed, send failed: %s",
+ (int)sizeof(remote->event_serv.peer_addr),
+ remote->event_serv.peer_addr,
+ ntohs(remote->event_serv.peer.sin_port),
+ strerror(errno));
+ }
+ return -1;
+ }
+ if (bytes_written == 0)
+ {
+ if (remote->event_serv.peer_addr[0] == '\0')
+ {
+ syslog(LOG_DAEMON, "%s", "Distributor connection closed during write");
+ }
+ else
+ {
+ syslog(LOG_DAEMON,
+ "Distributor connection to %.*s:%u closed during write",
+ (int)sizeof(remote->event_serv.peer_addr),
+ remote->event_serv.peer_addr,
+ ntohs(remote->event_serv.peer.sin_port));
+ }
+ return -1;
+ }
+ if ((size_t)bytes_written < remote->buf.used)
+ {
+ syslog(LOG_DAEMON,
+ "Distributor wrote less than expected to %.*s:%u: %zd < %zu",
+ (int)sizeof(remote->event_serv.peer_addr),
+ remote->event_serv.peer_addr,
+ ntohs(remote->event_serv.peer.sin_port),
+ bytes_written,
+ remote->buf.used);
+ memmove(remote->buf.ptr.raw, remote->buf.ptr.raw + bytes_written, remote->buf.used - bytes_written);
+ remote->buf.used -= bytes_written;
+ return -1;
+ }
+
+ remote->buf.used = 0;
+ return 0;
+}
+
static int drain_cache(struct remote_desc * const remote)
{
errno = 0;
+ if (drain_main_buffer(remote) != 0)
+ {
+ return -1;
+ }
+
while (utarray_len(remote->buf_cache) > 0)
{
struct nDPIsrvd_buffer * buf = (struct nDPIsrvd_buffer *)utarray_front(remote->buf_cache);
@@ -206,9 +275,13 @@ static int handle_outgoing_data(int epollfd, struct remote_desc * const remote)
}
if (drain_cache(remote) != 0)
{
+ disconnect_client(epollfd, remote);
return -1;
}
- del_event(epollfd, remote->fd);
+ if (utarray_len(remote->buf_cache) == 0)
+ {
+ del_event(epollfd, remote->fd);
+ }
return 0;
}
@@ -247,20 +320,11 @@ static int create_listen_sockets(void)
return 1;
}
- int opt = NETWORK_BUFFER_MAX_SIZE * 16;
- if (setsockopt(serv_sockfd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)) < 0 ||
- setsockopt(json_sockfd, SOL_SOCKET, SO_RCVBUF, &opt, sizeof(opt)) < 0)
- {
- syslog(LOG_DAEMON | LOG_ERR, "setsockopt with SO_RCVBUF/SO_SNDBUF failed: %s", strerror(errno));
- return 1;
- }
-
- opt = 1;
+ int opt = 1;
if (setsockopt(json_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0 ||
setsockopt(serv_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0)
{
syslog(LOG_DAEMON | LOG_ERR, "setsockopt with SO_REUSEADDR failed: %s", strerror(errno));
- return 1;
}
struct sockaddr_un json_addr;
@@ -546,12 +610,18 @@ static int new_connection(int epollfd, int eventfd)
}
char const * sock_type = NULL;
+ int sockopt = NETWORK_BUFFER_MAX_SIZE;
switch (current->sock_type)
{
case JSON_SOCK:
sock_type = "collector";
current->event_json.json_bytes = 0;
syslog(LOG_DAEMON, "New collector connection");
+
+ if (setsockopt(current->fd, SOL_SOCKET, SO_RCVBUF, &sockopt, sizeof(sockopt)) < 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error setting socket option SO_RCVBUF: %s", strerror(errno));
+ }
break;
case SERV_SOCK:
sock_type = "distributor";
@@ -584,6 +654,10 @@ static int new_connection(int epollfd, int eventfd)
{
syslog(LOG_DAEMON | LOG_ERR, "Error setting socket option send timeout: %s", strerror(errno));
}
+ if (setsockopt(current->fd, SOL_SOCKET, SO_SNDBUF, &sockopt, sizeof(sockopt)) < 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error setting socket option SO_SNDBUF: %s", strerror(errno));
+ }
}
break;
}
@@ -726,89 +800,39 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
{
continue;
}
- if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used &&
- utarray_len(remotes.desc[i].buf_cache) == 0)
+ if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used ||
+ utarray_len(remotes.desc[i].buf_cache) > 0)
{
- syslog(LOG_DAEMON, "Buffer capacity threshold (%zu bytes) reached, caching.", remotes.desc[i].buf.used);
- if (add_to_cache(&remotes.desc[i], remotes.desc[i].buf.ptr.raw, remotes.desc[i].buf.used) != 0)
- {
- disconnect_client(epollfd, &remotes.desc[i]);
- continue;
- }
- errno = 0;
- if (add_out_event(epollfd, remotes.desc[i].fd, &remotes.desc[i]) != 0)
+ if (utarray_len(remotes.desc[i].buf_cache) == 0)
{
- syslog(LOG_DAEMON | LOG_ERR, "%s: %s", "Could not add event, disconnecting", strerror(errno));
- disconnect_client(epollfd, &remotes.desc[i]);
- continue;
+ syslog(LOG_DAEMON, "Buffer capacity threshold (%zu bytes) reached, caching JSON strings.", remotes.desc[i].buf.used);
+ errno = 0;
+ if (add_out_event(epollfd, remotes.desc[i].fd, &remotes.desc[i]) != 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "%s: %s", "Could not add event, disconnecting", strerror(errno));
+ disconnect_client(epollfd, &remotes.desc[i]);
+ continue;
+ }
}
- }
- if (utarray_len(remotes.desc[i].buf_cache) > 0)
- {
if (add_to_cache(&remotes.desc[i], current->buf.ptr.raw, current->event_json.json_bytes) != 0)
{
disconnect_client(epollfd, &remotes.desc[i]);
+ continue;
}
- continue;
}
-
- memcpy(remotes.desc[i].buf.ptr.raw + remotes.desc[i].buf.used,
- current->buf.ptr.raw,
- current->event_json.json_bytes);
- remotes.desc[i].buf.used += current->event_json.json_bytes;
-
- errno = 0;
- ssize_t bytes_written = write(remotes.desc[i].fd, remotes.desc[i].buf.ptr.raw, remotes.desc[i].buf.used);
- if (errno == EAGAIN)
- {
- continue;
- }
- if (bytes_written < 0 || errno != 0)
+ else
{
- if (remotes.desc[i].event_serv.peer_addr[0] == '\0')
- {
- syslog(LOG_DAEMON | LOG_ERR, "Distributor connection closed, send failed: %s", strerror(errno));
- }
- else
- {
- syslog(LOG_DAEMON | LOG_ERR,
- "Distributor connection to %.*s:%u closed, send failed: %s",
- (int)sizeof(remotes.desc[i].event_serv.peer_addr),
- remotes.desc[i].event_serv.peer_addr,
- ntohs(remotes.desc[i].event_serv.peer.sin_port),
- strerror(errno));
- }
- disconnect_client(epollfd, &remotes.desc[i]);
- continue;
+ memcpy(remotes.desc[i].buf.ptr.raw + remotes.desc[i].buf.used,
+ current->buf.ptr.raw,
+ current->event_json.json_bytes);
+ remotes.desc[i].buf.used += current->event_json.json_bytes;
}
- if (bytes_written == 0)
+
+ if (drain_main_buffer(&remotes.desc[i]) != 0)
{
- syslog(LOG_DAEMON,
- "Distributor connection to %.*s:%u closed during write",
- (int)sizeof(remotes.desc[i].event_serv.peer_addr),
- remotes.desc[i].event_serv.peer_addr,
- ntohs(remotes.desc[i].event_serv.peer.sin_port));
disconnect_client(epollfd, &remotes.desc[i]);
- continue;
}
- if ((size_t)bytes_written < remotes.desc[i].buf.used)
- {
- syslog(LOG_DAEMON,
- "Distributor wrote less than expected to %.*s:%u: %zd < %zu",
- (int)sizeof(remotes.desc[i].event_serv.peer_addr),
- remotes.desc[i].event_serv.peer_addr,
- ntohs(remotes.desc[i].event_serv.peer.sin_port),
- bytes_written,
- remotes.desc[i].buf.used);
- memmove(remotes.desc[i].buf.ptr.raw,
- remotes.desc[i].buf.ptr.raw + bytes_written,
- remotes.desc[i].buf.used - bytes_written);
- remotes.desc[i].buf.used -= bytes_written;
- continue;
- }
-
- remotes.desc[i].buf.used = 0;
}
memmove(current->buf.ptr.raw,