diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2020-12-19 20:51:21 +0100 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2020-12-19 20:51:21 +0100 |
commit | 9f3d7b479ce03f5c605f5eb64cd07feaab9a76c4 (patch) | |
tree | 497783cf615b5c6d5b20c1256146b8a730821820 /nDPIsrvd.c | |
parent | b37cafe910bfc2aee355a84d960b5bd6cfc81ce3 (diff) |
nDPIsrvd: Distributor clients which are too slow can cause buffer bloat.
Switching back to blocking mode works as a quick fix but is not sufficient.
See comments.
* nDPId prints more accurate error messages if command line argument validation failed
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r-- | nDPIsrvd.c | 36 |
1 files changed, 35 insertions, 1 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c index 5bbfd9763..379cfc20a 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -643,7 +643,41 @@ int main(int argc, char ** argv) } if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used) { - continue; + syslog(LOG_DAEMON | LOG_ERR, + "Buffer capacity threshold (%zu of max %zu bytes) reached, " + "falling back to blocking mode.", + remotes.desc[i].buf.used, + remotes.desc[i].buf.max); + /* + * FIXME: Maybe switch to a Multithreading distributor data transmission, + * so that we do not have to switch back to blocking mode here! + * NOTE: If *one* distributer peer is too slow, all other distributors are + * affected by this. This causes starvation and leads to a possible data loss on + * the nDPId collector side. + */ + int fd_flags = fcntl(remotes.desc[i].fd, F_GETFL, 0); + if (fd_flags == -1 || fcntl(remotes.desc[i].fd, F_SETFL, fd_flags & ~O_NONBLOCK) == -1) + { + syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno)); + disconnect_client(epollfd, &remotes.desc[i]); + continue; + } + if (write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used) != + (ssize_t)remotes.desc[i].buf.used) + { + syslog(LOG_DAEMON | LOG_ERR, + "Could not drain buffer by %zu bytes. (forced)", + remotes.desc[i].buf.used); + disconnect_client(epollfd, &remotes.desc[i]); + continue; + } + remotes.desc[i].buf.used = 0; + if (fcntl(remotes.desc[i].fd, F_SETFL, fd_flags) == -1) + { + syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno)); + disconnect_client(epollfd, &remotes.desc[i]); + continue; + } } memcpy(remotes.desc[i].buf.ptr + remotes.desc[i].buf.used, |