diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2021-08-05 01:14:04 +0200 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2021-08-05 02:02:51 +0200 |
commit | 6faded3cc7084cb898773dafc1ca9422242f9c81 (patch) | |
tree | 7beae11228ee671362af1e88f397bd80ead59e3b /nDPIsrvd.c | |
parent | d48508b4afe5f3a22c2dda733ee13554d5c5ae60 (diff) |
Improved and Fixed another buffering issue caused by removing an outgoing fd too early from epoll queue (EPOLLOUT).
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r-- | nDPIsrvd.c | 186 |
1 files changed, 105 insertions, 81 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c index 781071fe2..a63a410f1 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -126,7 +126,7 @@ static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf, else { syslog(LOG_DAEMON | LOG_ERR, - "Buffer cache limit (%u lines) reached, falling back to blocking I/O.", + "Buffer JSON string cache limit (%u lines) reached, falling back to blocking I/O.", utarray_len(remote->buf_cache)); if (drain_cache_blocking(remote) != 0) { @@ -144,10 +144,79 @@ static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf, return 0; } +static int drain_main_buffer(struct remote_desc * const remote) +{ + if (remote->buf.used == 0) + { + return 0; + } + + errno = 0; + ssize_t bytes_written = write(remote->fd, remote->buf.ptr.raw, remote->buf.used); + if (errno == EAGAIN) + { + return 0; + } + if (bytes_written < 0 || errno != 0) + { + if (remote->event_serv.peer_addr[0] == '\0') + { + syslog(LOG_DAEMON | LOG_ERR, "Distributor connection closed, send failed: %s", strerror(errno)); + } + else + { + syslog(LOG_DAEMON | LOG_ERR, + "Distributor connection to %.*s:%u closed, send failed: %s", + (int)sizeof(remote->event_serv.peer_addr), + remote->event_serv.peer_addr, + ntohs(remote->event_serv.peer.sin_port), + strerror(errno)); + } + return -1; + } + if (bytes_written == 0) + { + if (remote->event_serv.peer_addr[0] == '\0') + { + syslog(LOG_DAEMON, "%s", "Distributor connection closed during write"); + } + else + { + syslog(LOG_DAEMON, + "Distributor connection to %.*s:%u closed during write", + (int)sizeof(remote->event_serv.peer_addr), + remote->event_serv.peer_addr, + ntohs(remote->event_serv.peer.sin_port)); + } + return -1; + } + if ((size_t)bytes_written < remote->buf.used) + { + syslog(LOG_DAEMON, + "Distributor wrote less than expected to %.*s:%u: %zd < %zu", + (int)sizeof(remote->event_serv.peer_addr), + remote->event_serv.peer_addr, + ntohs(remote->event_serv.peer.sin_port), + bytes_written, + remote->buf.used); + memmove(remote->buf.ptr.raw, remote->buf.ptr.raw + bytes_written, remote->buf.used - bytes_written); + remote->buf.used -= bytes_written; + return -1; + } + + remote->buf.used = 0; + return 0; +} + static int drain_cache(struct remote_desc * const remote) { errno = 0; + if (drain_main_buffer(remote) != 0) + { + return -1; + } + while (utarray_len(remote->buf_cache) > 0) { struct nDPIsrvd_buffer * buf = (struct nDPIsrvd_buffer *)utarray_front(remote->buf_cache); @@ -206,9 +275,13 @@ static int handle_outgoing_data(int epollfd, struct remote_desc * const remote) } if (drain_cache(remote) != 0) { + disconnect_client(epollfd, remote); return -1; } - del_event(epollfd, remote->fd); + if (utarray_len(remote->buf_cache) == 0) + { + del_event(epollfd, remote->fd); + } return 0; } @@ -247,20 +320,11 @@ static int create_listen_sockets(void) return 1; } - int opt = NETWORK_BUFFER_MAX_SIZE * 16; - if (setsockopt(serv_sockfd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)) < 0 || - setsockopt(json_sockfd, SOL_SOCKET, SO_RCVBUF, &opt, sizeof(opt)) < 0) - { - syslog(LOG_DAEMON | LOG_ERR, "setsockopt with SO_RCVBUF/SO_SNDBUF failed: %s", strerror(errno)); - return 1; - } - - opt = 1; + int opt = 1; if (setsockopt(json_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0 || setsockopt(serv_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { syslog(LOG_DAEMON | LOG_ERR, "setsockopt with SO_REUSEADDR failed: %s", strerror(errno)); - return 1; } struct sockaddr_un json_addr; @@ -546,12 +610,18 @@ static int new_connection(int epollfd, int eventfd) } char const * sock_type = NULL; + int sockopt = NETWORK_BUFFER_MAX_SIZE; switch (current->sock_type) { case JSON_SOCK: sock_type = "collector"; current->event_json.json_bytes = 0; syslog(LOG_DAEMON, "New collector connection"); + + if (setsockopt(current->fd, SOL_SOCKET, SO_RCVBUF, &sockopt, sizeof(sockopt)) < 0) + { + syslog(LOG_DAEMON | LOG_ERR, "Error setting socket option SO_RCVBUF: %s", strerror(errno)); + } break; case SERV_SOCK: sock_type = "distributor"; @@ -584,6 +654,10 @@ static int new_connection(int epollfd, int eventfd) { syslog(LOG_DAEMON | LOG_ERR, "Error setting socket option send timeout: %s", strerror(errno)); } + if (setsockopt(current->fd, SOL_SOCKET, SO_SNDBUF, &sockopt, sizeof(sockopt)) < 0) + { + syslog(LOG_DAEMON | LOG_ERR, "Error setting socket option SO_SNDBUF: %s", strerror(errno)); + } } break; } @@ -726,89 +800,39 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) { continue; } - if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used && - utarray_len(remotes.desc[i].buf_cache) == 0) + if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used || + utarray_len(remotes.desc[i].buf_cache) > 0) { - syslog(LOG_DAEMON, "Buffer capacity threshold (%zu bytes) reached, caching.", remotes.desc[i].buf.used); - if (add_to_cache(&remotes.desc[i], remotes.desc[i].buf.ptr.raw, remotes.desc[i].buf.used) != 0) - { - disconnect_client(epollfd, &remotes.desc[i]); - continue; - } - errno = 0; - if (add_out_event(epollfd, remotes.desc[i].fd, &remotes.desc[i]) != 0) + if (utarray_len(remotes.desc[i].buf_cache) == 0) { - syslog(LOG_DAEMON | LOG_ERR, "%s: %s", "Could not add event, disconnecting", strerror(errno)); - disconnect_client(epollfd, &remotes.desc[i]); - continue; + syslog(LOG_DAEMON, "Buffer capacity threshold (%zu bytes) reached, caching JSON strings.", remotes.desc[i].buf.used); + errno = 0; + if (add_out_event(epollfd, remotes.desc[i].fd, &remotes.desc[i]) != 0) + { + syslog(LOG_DAEMON | LOG_ERR, "%s: %s", "Could not add event, disconnecting", strerror(errno)); + disconnect_client(epollfd, &remotes.desc[i]); + continue; + } } - } - if (utarray_len(remotes.desc[i].buf_cache) > 0) - { if (add_to_cache(&remotes.desc[i], current->buf.ptr.raw, current->event_json.json_bytes) != 0) { disconnect_client(epollfd, &remotes.desc[i]); + continue; } - continue; } - - memcpy(remotes.desc[i].buf.ptr.raw + remotes.desc[i].buf.used, - current->buf.ptr.raw, - current->event_json.json_bytes); - remotes.desc[i].buf.used += current->event_json.json_bytes; - - errno = 0; - ssize_t bytes_written = write(remotes.desc[i].fd, remotes.desc[i].buf.ptr.raw, remotes.desc[i].buf.used); - if (errno == EAGAIN) - { - continue; - } - if (bytes_written < 0 || errno != 0) + else { - if (remotes.desc[i].event_serv.peer_addr[0] == '\0') - { - syslog(LOG_DAEMON | LOG_ERR, "Distributor connection closed, send failed: %s", strerror(errno)); - } - else - { - syslog(LOG_DAEMON | LOG_ERR, - "Distributor connection to %.*s:%u closed, send failed: %s", - (int)sizeof(remotes.desc[i].event_serv.peer_addr), - remotes.desc[i].event_serv.peer_addr, - ntohs(remotes.desc[i].event_serv.peer.sin_port), - strerror(errno)); - } - disconnect_client(epollfd, &remotes.desc[i]); - continue; + memcpy(remotes.desc[i].buf.ptr.raw + remotes.desc[i].buf.used, + current->buf.ptr.raw, + current->event_json.json_bytes); + remotes.desc[i].buf.used += current->event_json.json_bytes; } - if (bytes_written == 0) + + if (drain_main_buffer(&remotes.desc[i]) != 0) { - syslog(LOG_DAEMON, - "Distributor connection to %.*s:%u closed during write", - (int)sizeof(remotes.desc[i].event_serv.peer_addr), - remotes.desc[i].event_serv.peer_addr, - ntohs(remotes.desc[i].event_serv.peer.sin_port)); disconnect_client(epollfd, &remotes.desc[i]); - continue; } - if ((size_t)bytes_written < remotes.desc[i].buf.used) - { - syslog(LOG_DAEMON, - "Distributor wrote less than expected to %.*s:%u: %zd < %zu", - (int)sizeof(remotes.desc[i].event_serv.peer_addr), - remotes.desc[i].event_serv.peer_addr, - ntohs(remotes.desc[i].event_serv.peer.sin_port), - bytes_written, - remotes.desc[i].buf.used); - memmove(remotes.desc[i].buf.ptr.raw, - remotes.desc[i].buf.ptr.raw + bytes_written, - remotes.desc[i].buf.used - bytes_written); - remotes.desc[i].buf.used -= bytes_written; - continue; - } - - remotes.desc[i].buf.used = 0; } memmove(current->buf.ptr.raw, |