aboutsummaryrefslogtreecommitdiff
path: root/nDPIsrvd.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2020-08-12 12:24:39 +0200
committerToni Uhlig <matzeton@googlemail.com>2020-08-12 12:24:39 +0200
commit5e0a27d2137a4c03db6876840b818b57fea44b23 (patch)
treea185eb3ba8924ef19a452b5817b6123aee237ae3 /nDPIsrvd.c
parent8ccdadd3c7e269427ba6fce91d413eeeab67544a (diff)
improved nDPIsrvd buffering if write returned EAGAIN
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r--nDPIsrvd.c135
1 files changed, 85 insertions, 50 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c
index d9ba73c23..5aecb0bc3 100644
--- a/nDPIsrvd.c
+++ b/nDPIsrvd.c
@@ -23,18 +23,23 @@ enum ev_type
SERV_SOCK
};
+struct io_buffer {
+ uint8_t * ptr;
+ size_t used;
+ size_t max;
+};
+
struct remote_desc
{
enum ev_type type;
int fd;
- uint8_t buf[NETWORK_BUFFER_MAX_SIZE];
- size_t buf_used;
- unsigned long long int buf_wanted;
+ struct io_buffer buf;
union {
struct
{
int json_sockfd;
struct sockaddr_un peer;
+ unsigned long long int json_bytes;
} event_json;
struct
{
@@ -144,9 +149,9 @@ static struct remote_desc * get_unused_remote_descriptor(void)
if (remotes.desc[i].fd == -1)
{
remotes.desc_used++;
- remotes.desc[i].buf[0] = '\0';
- remotes.desc[i].buf_used = 0;
- remotes.desc[i].buf_wanted = 0;
+ remotes.desc[i].buf.ptr = (uint8_t *) malloc(NETWORK_BUFFER_MAX_SIZE);
+ remotes.desc[i].buf.max = NETWORK_BUFFER_MAX_SIZE;
+ remotes.desc[i].buf.used = 0;
return &remotes.desc[i];
}
}
@@ -167,6 +172,8 @@ static void disconnect_client(int epollfd, struct remote_desc * const current)
syslog(LOG_DAEMON | LOG_ERR, "Error closing fd: %s", strerror(errno));
}
}
+ free(current->buf.ptr);
+ current->buf.ptr = NULL;
current->fd = -1;
remotes.desc_used--;
}
@@ -254,6 +261,8 @@ int main(int argc, char ** argv)
for (size_t i = 0; i < remotes.desc_size; ++i)
{
remotes.desc[i].fd = -1;
+ remotes.desc[i].buf.ptr = NULL;
+ remotes.desc[i].buf.max = 0;
}
if (create_listen_sockets() != 0)
@@ -323,9 +332,9 @@ int main(int argc, char ** argv)
}
current->type = (events[i].data.fd == json_sockfd ? JSON_SOCK : SERV_SOCK);
- int sockfd = (events[i].data.fd == json_sockfd ? json_sockfd : serv_sockfd);
- socklen_t peer_addr_len = (events[i].data.fd == json_sockfd ? sizeof(current->event_json.peer)
- : sizeof(current->event_serv.peer));
+ int sockfd = (current->type == JSON_SOCK ? json_sockfd : serv_sockfd);
+ socklen_t peer_addr_len = (current->type == JSON_SOCK ? sizeof(current->event_json.peer)
+ : sizeof(current->event_serv.peer));
current->fd = accept(sockfd,
(current->type == JSON_SOCK ? (struct sockaddr *)&current->event_json.peer
@@ -337,22 +346,22 @@ int main(int argc, char ** argv)
disconnect_client(epollfd, current);
continue;
}
- if (events[i].data.fd == serv_sockfd && inet_ntop(current->event_serv.peer.sin_family,
- &current->event_serv.peer.sin_addr,
- &current->event_serv.peer_addr[0],
- sizeof(current->event_serv.peer_addr)) == NULL)
- {
- syslog(LOG_DAEMON | LOG_ERR, "Error converting an internet address: %s", strerror(errno));
- disconnect_client(epollfd, current);
- continue;
- }
switch (current->type)
{
case JSON_SOCK:
+ current->event_json.json_bytes = 0;
syslog(LOG_DAEMON, "New collector connection");
break;
case SERV_SOCK:
+ if (inet_ntop(current->event_serv.peer.sin_family,
+ &current->event_serv.peer.sin_addr,
+ &current->event_serv.peer_addr[0],
+ sizeof(current->event_serv.peer_addr)) == NULL)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error converting an internet address: %s", strerror(errno));
+ current->event_serv.peer_addr[0] = '\0';
+ }
syslog(LOG_DAEMON,
"New distributor connection from %.*s:%u",
(int)sizeof(current->event_serv.peer_addr),
@@ -409,7 +418,7 @@ int main(int argc, char ** argv)
/* read JSON strings (or parts) from the UNIX socket (collecting) */
errno = 0;
ssize_t bytes_read =
- read(current->fd, current->buf + current->buf_used, sizeof(current->buf) - current->buf_used);
+ read(current->fd, current->buf.ptr + current->buf.used, current->buf.max - current->buf.used);
if (bytes_read < 0 || errno != 0)
{
disconnect_client(epollfd, current);
@@ -428,56 +437,56 @@ int main(int argc, char ** argv)
if (current->type == JSON_SOCK)
{
/* buffer all data until we got the whole JSON string */
- current->buf_used += bytes_read;
- if (current->buf_wanted == 0)
+ current->buf.used += bytes_read;
+ if (current->event_json.json_bytes == 0)
{
char * json_str_start = NULL;
errno = 0;
/* the first bytes are the textual representation of the following JSON string */
- current->buf_wanted = strtoull((char *)current->buf, &json_str_start, 10);
- current->buf_wanted += (uint8_t *)json_str_start - current->buf;
+ current->event_json.json_bytes = strtoull((char *)current->buf.ptr, &json_str_start, 10);
+ current->event_json.json_bytes += (uint8_t *)json_str_start - current->buf.ptr;
if (errno == ERANGE)
{
- current->buf_used = 0;
- current->buf_wanted = 0;
+ current->buf.used = 0;
+ current->event_json.json_bytes = 0;
syslog(LOG_DAEMON | LOG_ERR, "Size of JSON exceeds limit");
continue;
}
- if ((uint8_t *)json_str_start == current->buf)
+ if ((uint8_t *)json_str_start == current->buf.ptr)
{
syslog(LOG_DAEMON | LOG_ERR,
"Missing size before JSON string: %.*s",
- (int)current->buf_used,
- current->buf);
- current->buf_used = 0;
- current->buf_wanted = 0;
+ (int)current->buf.used,
+ current->buf.ptr);
+ current->buf.used = 0;
+ current->event_json.json_bytes = 0;
continue;
}
- if (current->buf_wanted > sizeof(current->buf))
+ if (current->event_json.json_bytes > current->buf.max)
{
syslog(LOG_DAEMON | LOG_ERR,
"BUG: JSON string too big: %llu > %zu",
- current->buf_wanted,
- sizeof(current->buf));
- current->buf_used = 0;
- current->buf_wanted = 0;
+ current->event_json.json_bytes,
+ current->buf.max);
+ current->buf.used = 0;
+ current->event_json.json_bytes = 0;
continue;
}
}
/* buffered enough data (full JSON String) ? */
- if (current->buf_wanted > current->buf_used)
+ if (current->event_json.json_bytes > current->buf.used)
{
continue;
}
/* after buffering complete, last character should always be a '}' (end of object) */
- if (current->buf[current->buf_wanted - 1] != '}')
+ if (current->buf.ptr[current->event_json.json_bytes - 1] != '}')
{
syslog(LOG_DAEMON | LOG_ERR,
"Invalid JSON string: %.*s",
- (int)current->buf_wanted,
- current->buf);
- current->buf_used = 0;
- current->buf_wanted = 0;
+ (int)current->event_json.json_bytes,
+ current->buf.ptr);
+ current->buf.used = 0;
+ current->event_json.json_bytes = 0;
continue;
}
@@ -490,10 +499,36 @@ int main(int argc, char ** argv)
}
if (remotes.desc[i].type == SERV_SOCK)
{
- ssize_t bytes_written = write(remotes.desc[i].fd, current->buf, current->buf_wanted);
- if (errno == EAGAIN) {
- /* TODO: Prevent data loss */
- syslog(LOG_DAEMON | LOG_ERR, "Distributor write buffer bloat; Data loss!");
+ ssize_t bytes_written;
+
+ if (remotes.desc[i].buf.used > 0) {
+ bytes_written = write(remotes.desc[i].fd, remotes.desc[i].buf.ptr,
+ remotes.desc[i].buf.used);
+ if (bytes_written > 0) {
+ memmove(remotes.desc[i].buf.ptr,
+ remotes.desc[i].buf.ptr + remotes.desc[i].event_json.json_bytes,
+ remotes.desc[i].buf.used - remotes.desc[i].event_json.json_bytes);
+ remotes.desc[i].buf.used -= remotes.desc[i].event_json.json_bytes;
+ }
+ }
+
+ bytes_written = write(remotes.desc[i].fd, current->buf.ptr,
+ current->event_json.json_bytes);
+ if (errno == EAGAIN)
+ {
+ if ((unsigned long long int)bytes_written < current->event_json.json_bytes) {
+ if (remotes.desc[i].buf.max - remotes.desc[i].buf.used < current->event_json.json_bytes)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Distributor write buffer bloat, no more space available.");
+ disconnect_client(epollfd, &remotes.desc[i]);
+ continue;
+ }
+ syslog(LOG_DAEMON | LOG_ERR, "Distributor write temporarily failed, buffering ..");
+ memcpy(remotes.desc[i].buf.ptr,
+ remotes.desc[i].buf.ptr + remotes.desc[i].buf.used,
+ current->event_json.json_bytes);
+ remotes.desc[i].buf.used += current->event_json.json_bytes;
+ }
continue;
}
if (bytes_written < 0 || errno != 0)
@@ -514,11 +549,11 @@ int main(int argc, char ** argv)
}
}
- memmove(current->buf,
- current->buf + current->buf_wanted,
- current->buf_used - current->buf_wanted);
- current->buf_used -= current->buf_wanted;
- current->buf_wanted = 0;
+ memmove(current->buf.ptr,
+ current->buf.ptr + current->event_json.json_bytes,
+ current->buf.used - current->event_json.json_bytes);
+ current->buf.used -= current->event_json.json_bytes;
+ current->event_json.json_bytes = 0;
}
}
}