summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--nDPIsrvd.c38
1 files changed, 30 insertions, 8 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c
index b940a4fea..fcd15a2f9 100644
--- a/nDPIsrvd.c
+++ b/nDPIsrvd.c
@@ -18,8 +18,6 @@ enum ev_type { JSON_SOCK, SERV_SOCK };
struct remote_desc {
enum ev_type type;
int fd;
- uint8_t buf[BUFSIZ];
- size_t buf_used;
union {
struct {
int json_sockfd;
@@ -125,8 +123,6 @@ static struct remote_desc * get_unused_remote_descriptor(void)
for (size_t i = 0; i < remotes.desc_size; ++i) {
if (remotes.desc[i].fd == -1) {
remotes.desc_used++;
- remotes.desc[i].buf[0] = '\0';
- remotes.desc[i].buf_used = 0;
return &remotes.desc[i];
}
}
@@ -250,6 +246,11 @@ int main(void)
continue;
}
+ /* shutdown writing end for collector clients */
+ if (current->type == JSON_SOCK) {
+ shutdown(current->fd, SHUT_WR); // collector
+ }
+
/* setup epoll event */
struct epoll_event accept_event = {};
accept_event.data.ptr = current;
@@ -275,9 +276,8 @@ int main(void)
}
if (events[i].events & EPOLLIN) {
errno = 0;
- ssize_t bytes_read = read(current->fd,
- current->buf + current->buf_used,
- sizeof(current->buf) - current->buf_used);
+ char buf[BUFSIZ];
+ ssize_t bytes_read = read(current->fd, buf, sizeof(buf));
if (bytes_read < 0 || errno != 0) {
disconnect_client(epollfd, current);
continue;
@@ -289,7 +289,29 @@ int main(void)
disconnect_client(epollfd, current);
continue;
}
- current->buf_used += bytes_read;
+
+ /* broadcast data coming from the json-collector socket to all tcp clients */
+ if (current->type == JSON_SOCK) {
+ for (size_t i = 0; i < remotes.desc_size; ++i) {
+ if (remotes.desc[i].fd < 0) {
+ continue;
+ }
+ if (remotes.desc[i].type == SERV_SOCK) {
+ ssize_t bytes_written = write(remotes.desc[i].fd, buf, bytes_read);
+ if (bytes_written < 0 || errno != 0) {
+ disconnect_client(epollfd, current);
+ continue;
+ }
+ if (bytes_written == 0) {
+ syslog(LOG_DAEMON, "%s connection closed during write", (current->type == JSON_SOCK
+ ? "collector"
+ : "distributor"));
+ disconnect_client(epollfd, current);
+ continue;
+ }
+ }
+ }
+ }
}
}
}