diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2020-08-03 17:12:30 +0200 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2020-08-03 17:12:30 +0200 |
commit | 61807ffd09aeb1598ee7f0ae6895acb296cc4e28 (patch) | |
tree | 10f36eca0752fc5af0b4c75556c2e36fd0e72e58 /nDPIsrvd.c | |
parent | 92925a83552299a9462566248dee3e16a57434a6 (diff) |
forwarding data from collector(client,source,UNIX-sock) to distributor(client,sink,TCP-sock)
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r-- | nDPIsrvd.c | 38 |
1 files changed, 30 insertions, 8 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c index b940a4fea..fcd15a2f9 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -18,8 +18,6 @@ enum ev_type { JSON_SOCK, SERV_SOCK }; struct remote_desc { enum ev_type type; int fd; - uint8_t buf[BUFSIZ]; - size_t buf_used; union { struct { int json_sockfd; @@ -125,8 +123,6 @@ static struct remote_desc * get_unused_remote_descriptor(void) for (size_t i = 0; i < remotes.desc_size; ++i) { if (remotes.desc[i].fd == -1) { remotes.desc_used++; - remotes.desc[i].buf[0] = '\0'; - remotes.desc[i].buf_used = 0; return &remotes.desc[i]; } } @@ -250,6 +246,11 @@ int main(void) continue; } + /* shutdown writing end for collector clients */ + if (current->type == JSON_SOCK) { + shutdown(current->fd, SHUT_WR); // collector + } + /* setup epoll event */ struct epoll_event accept_event = {}; accept_event.data.ptr = current; @@ -275,9 +276,8 @@ int main(void) } if (events[i].events & EPOLLIN) { errno = 0; - ssize_t bytes_read = read(current->fd, - current->buf + current->buf_used, - sizeof(current->buf) - current->buf_used); + char buf[BUFSIZ]; + ssize_t bytes_read = read(current->fd, buf, sizeof(buf)); if (bytes_read < 0 || errno != 0) { disconnect_client(epollfd, current); continue; @@ -289,7 +289,29 @@ int main(void) disconnect_client(epollfd, current); continue; } - current->buf_used += bytes_read; + + /* broadcast data coming from the json-collector socket to all tcp clients */ + if (current->type == JSON_SOCK) { + for (size_t i = 0; i < remotes.desc_size; ++i) { + if (remotes.desc[i].fd < 0) { + continue; + } + if (remotes.desc[i].type == SERV_SOCK) { + ssize_t bytes_written = write(remotes.desc[i].fd, buf, bytes_read); + if (bytes_written < 0 || errno != 0) { + disconnect_client(epollfd, current); + continue; + } + if (bytes_written == 0) { + syslog(LOG_DAEMON, "%s connection closed during write", (current->type == JSON_SOCK + ? "collector" + : "distributor")); + disconnect_client(epollfd, current); + continue; + } + } + } + } } } } |