diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2020-08-12 12:24:39 +0200 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2020-08-12 12:24:39 +0200 |
commit | 5e0a27d2137a4c03db6876840b818b57fea44b23 (patch) | |
tree | a185eb3ba8924ef19a452b5817b6123aee237ae3 /nDPIsrvd.c | |
parent | 8ccdadd3c7e269427ba6fce91d413eeeab67544a (diff) |
improved nDPIsrvd buffering if write returned EAGAIN
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r-- | nDPIsrvd.c | 135 |
1 files changed, 85 insertions, 50 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c index d9ba73c23..5aecb0bc3 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -23,18 +23,23 @@ enum ev_type SERV_SOCK }; +struct io_buffer { + uint8_t * ptr; + size_t used; + size_t max; +}; + struct remote_desc { enum ev_type type; int fd; - uint8_t buf[NETWORK_BUFFER_MAX_SIZE]; - size_t buf_used; - unsigned long long int buf_wanted; + struct io_buffer buf; union { struct { int json_sockfd; struct sockaddr_un peer; + unsigned long long int json_bytes; } event_json; struct { @@ -144,9 +149,9 @@ static struct remote_desc * get_unused_remote_descriptor(void) if (remotes.desc[i].fd == -1) { remotes.desc_used++; - remotes.desc[i].buf[0] = '\0'; - remotes.desc[i].buf_used = 0; - remotes.desc[i].buf_wanted = 0; + remotes.desc[i].buf.ptr = (uint8_t *) malloc(NETWORK_BUFFER_MAX_SIZE); + remotes.desc[i].buf.max = NETWORK_BUFFER_MAX_SIZE; + remotes.desc[i].buf.used = 0; return &remotes.desc[i]; } } @@ -167,6 +172,8 @@ static void disconnect_client(int epollfd, struct remote_desc * const current) syslog(LOG_DAEMON | LOG_ERR, "Error closing fd: %s", strerror(errno)); } } + free(current->buf.ptr); + current->buf.ptr = NULL; current->fd = -1; remotes.desc_used--; } @@ -254,6 +261,8 @@ int main(int argc, char ** argv) for (size_t i = 0; i < remotes.desc_size; ++i) { remotes.desc[i].fd = -1; + remotes.desc[i].buf.ptr = NULL; + remotes.desc[i].buf.max = 0; } if (create_listen_sockets() != 0) @@ -323,9 +332,9 @@ int main(int argc, char ** argv) } current->type = (events[i].data.fd == json_sockfd ? JSON_SOCK : SERV_SOCK); - int sockfd = (events[i].data.fd == json_sockfd ? json_sockfd : serv_sockfd); - socklen_t peer_addr_len = (events[i].data.fd == json_sockfd ? sizeof(current->event_json.peer) - : sizeof(current->event_serv.peer)); + int sockfd = (current->type == JSON_SOCK ? json_sockfd : serv_sockfd); + socklen_t peer_addr_len = (current->type == JSON_SOCK ? sizeof(current->event_json.peer) + : sizeof(current->event_serv.peer)); current->fd = accept(sockfd, (current->type == JSON_SOCK ? (struct sockaddr *)¤t->event_json.peer @@ -337,22 +346,22 @@ int main(int argc, char ** argv) disconnect_client(epollfd, current); continue; } - if (events[i].data.fd == serv_sockfd && inet_ntop(current->event_serv.peer.sin_family, - ¤t->event_serv.peer.sin_addr, - ¤t->event_serv.peer_addr[0], - sizeof(current->event_serv.peer_addr)) == NULL) - { - syslog(LOG_DAEMON | LOG_ERR, "Error converting an internet address: %s", strerror(errno)); - disconnect_client(epollfd, current); - continue; - } switch (current->type) { case JSON_SOCK: + current->event_json.json_bytes = 0; syslog(LOG_DAEMON, "New collector connection"); break; case SERV_SOCK: + if (inet_ntop(current->event_serv.peer.sin_family, + ¤t->event_serv.peer.sin_addr, + ¤t->event_serv.peer_addr[0], + sizeof(current->event_serv.peer_addr)) == NULL) + { + syslog(LOG_DAEMON | LOG_ERR, "Error converting an internet address: %s", strerror(errno)); + current->event_serv.peer_addr[0] = '\0'; + } syslog(LOG_DAEMON, "New distributor connection from %.*s:%u", (int)sizeof(current->event_serv.peer_addr), @@ -409,7 +418,7 @@ int main(int argc, char ** argv) /* read JSON strings (or parts) from the UNIX socket (collecting) */ errno = 0; ssize_t bytes_read = - read(current->fd, current->buf + current->buf_used, sizeof(current->buf) - current->buf_used); + read(current->fd, current->buf.ptr + current->buf.used, current->buf.max - current->buf.used); if (bytes_read < 0 || errno != 0) { disconnect_client(epollfd, current); @@ -428,56 +437,56 @@ int main(int argc, char ** argv) if (current->type == JSON_SOCK) { /* buffer all data until we got the whole JSON string */ - current->buf_used += bytes_read; - if (current->buf_wanted == 0) + current->buf.used += bytes_read; + if (current->event_json.json_bytes == 0) { char * json_str_start = NULL; errno = 0; /* the first bytes are the textual representation of the following JSON string */ - current->buf_wanted = strtoull((char *)current->buf, &json_str_start, 10); - current->buf_wanted += (uint8_t *)json_str_start - current->buf; + current->event_json.json_bytes = strtoull((char *)current->buf.ptr, &json_str_start, 10); + current->event_json.json_bytes += (uint8_t *)json_str_start - current->buf.ptr; if (errno == ERANGE) { - current->buf_used = 0; - current->buf_wanted = 0; + current->buf.used = 0; + current->event_json.json_bytes = 0; syslog(LOG_DAEMON | LOG_ERR, "Size of JSON exceeds limit"); continue; } - if ((uint8_t *)json_str_start == current->buf) + if ((uint8_t *)json_str_start == current->buf.ptr) { syslog(LOG_DAEMON | LOG_ERR, "Missing size before JSON string: %.*s", - (int)current->buf_used, - current->buf); - current->buf_used = 0; - current->buf_wanted = 0; + (int)current->buf.used, + current->buf.ptr); + current->buf.used = 0; + current->event_json.json_bytes = 0; continue; } - if (current->buf_wanted > sizeof(current->buf)) + if (current->event_json.json_bytes > current->buf.max) { syslog(LOG_DAEMON | LOG_ERR, "BUG: JSON string too big: %llu > %zu", - current->buf_wanted, - sizeof(current->buf)); - current->buf_used = 0; - current->buf_wanted = 0; + current->event_json.json_bytes, + current->buf.max); + current->buf.used = 0; + current->event_json.json_bytes = 0; continue; } } /* buffered enough data (full JSON String) ? */ - if (current->buf_wanted > current->buf_used) + if (current->event_json.json_bytes > current->buf.used) { continue; } /* after buffering complete, last character should always be a '}' (end of object) */ - if (current->buf[current->buf_wanted - 1] != '}') + if (current->buf.ptr[current->event_json.json_bytes - 1] != '}') { syslog(LOG_DAEMON | LOG_ERR, "Invalid JSON string: %.*s", - (int)current->buf_wanted, - current->buf); - current->buf_used = 0; - current->buf_wanted = 0; + (int)current->event_json.json_bytes, + current->buf.ptr); + current->buf.used = 0; + current->event_json.json_bytes = 0; continue; } @@ -490,10 +499,36 @@ int main(int argc, char ** argv) } if (remotes.desc[i].type == SERV_SOCK) { - ssize_t bytes_written = write(remotes.desc[i].fd, current->buf, current->buf_wanted); - if (errno == EAGAIN) { - /* TODO: Prevent data loss */ - syslog(LOG_DAEMON | LOG_ERR, "Distributor write buffer bloat; Data loss!"); + ssize_t bytes_written; + + if (remotes.desc[i].buf.used > 0) { + bytes_written = write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, + remotes.desc[i].buf.used); + if (bytes_written > 0) { + memmove(remotes.desc[i].buf.ptr, + remotes.desc[i].buf.ptr + remotes.desc[i].event_json.json_bytes, + remotes.desc[i].buf.used - remotes.desc[i].event_json.json_bytes); + remotes.desc[i].buf.used -= remotes.desc[i].event_json.json_bytes; + } + } + + bytes_written = write(remotes.desc[i].fd, current->buf.ptr, + current->event_json.json_bytes); + if (errno == EAGAIN) + { + if ((unsigned long long int)bytes_written < current->event_json.json_bytes) { + if (remotes.desc[i].buf.max - remotes.desc[i].buf.used < current->event_json.json_bytes) + { + syslog(LOG_DAEMON | LOG_ERR, "Distributor write buffer bloat, no more space available."); + disconnect_client(epollfd, &remotes.desc[i]); + continue; + } + syslog(LOG_DAEMON | LOG_ERR, "Distributor write temporarily failed, buffering .."); + memcpy(remotes.desc[i].buf.ptr, + remotes.desc[i].buf.ptr + remotes.desc[i].buf.used, + current->event_json.json_bytes); + remotes.desc[i].buf.used += current->event_json.json_bytes; + } continue; } if (bytes_written < 0 || errno != 0) @@ -514,11 +549,11 @@ int main(int argc, char ** argv) } } - memmove(current->buf, - current->buf + current->buf_wanted, - current->buf_used - current->buf_wanted); - current->buf_used -= current->buf_wanted; - current->buf_wanted = 0; + memmove(current->buf.ptr, + current->buf.ptr + current->event_json.json_bytes, + current->buf.used - current->event_json.json_bytes); + current->buf.used -= current->event_json.json_bytes; + current->event_json.json_bytes = 0; } } } |