aboutsummaryrefslogtreecommitdiff
path: root/nDPIsrvd.c
diff options
context:
space:
mode:
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r--nDPIsrvd.c36
1 files changed, 35 insertions, 1 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c
index 5bbfd9763..379cfc20a 100644
--- a/nDPIsrvd.c
+++ b/nDPIsrvd.c
@@ -643,7 +643,41 @@ int main(int argc, char ** argv)
}
if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used)
{
- continue;
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Buffer capacity threshold (%zu of max %zu bytes) reached, "
+ "falling back to blocking mode.",
+ remotes.desc[i].buf.used,
+ remotes.desc[i].buf.max);
+ /*
+ * FIXME: Maybe switch to a Multithreading distributor data transmission,
+ * so that we do not have to switch back to blocking mode here!
+ * NOTE: If *one* distributer peer is too slow, all other distributors are
+ * affected by this. This causes starvation and leads to a possible data loss on
+ * the nDPId collector side.
+ */
+ int fd_flags = fcntl(remotes.desc[i].fd, F_GETFL, 0);
+ if (fd_flags == -1 || fcntl(remotes.desc[i].fd, F_SETFL, fd_flags & ~O_NONBLOCK) == -1)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno));
+ disconnect_client(epollfd, &remotes.desc[i]);
+ continue;
+ }
+ if (write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used) !=
+ (ssize_t)remotes.desc[i].buf.used)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Could not drain buffer by %zu bytes. (forced)",
+ remotes.desc[i].buf.used);
+ disconnect_client(epollfd, &remotes.desc[i]);
+ continue;
+ }
+ remotes.desc[i].buf.used = 0;
+ if (fcntl(remotes.desc[i].fd, F_SETFL, fd_flags) == -1)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno));
+ disconnect_client(epollfd, &remotes.desc[i]);
+ continue;
+ }
}
memcpy(remotes.desc[i].buf.ptr + remotes.desc[i].buf.used,