diff options
-rw-r--r-- | nDPIsrvd.c | 702 |
1 files changed, 371 insertions, 331 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c index 030393bfd..082a5c53a 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -24,13 +24,15 @@ struct io_buffer size_t max; }; +enum sock_type +{ + JSON_SOCK, + SERV_SOCK +}; + struct remote_desc { - enum - { - JSON_SOCK, - SERV_SOCK - } type; + enum sock_type sock_type; int fd; struct io_buffer buf; union { @@ -264,8 +266,370 @@ static int parse_options(int argc, char ** argv) return 0; } +static int new_connection(int epollfd, int eventfd) +{ + struct remote_desc * current = get_unused_remote_descriptor(); + + if (current == NULL) + { + syslog(LOG_DAEMON | LOG_ERR, "Max number of connections reached: %zu", remotes.desc_used); + return 1; + } + current->sock_type = (eventfd == json_sockfd ? JSON_SOCK : SERV_SOCK); + + int sockfd = (current->sock_type == JSON_SOCK ? json_sockfd : serv_sockfd); + socklen_t peer_addr_len = + (current->sock_type == JSON_SOCK ? sizeof(current->event_json.peer) : sizeof(current->event_serv.peer)); + + current->fd = accept(sockfd, + (current->sock_type == JSON_SOCK ? (struct sockaddr *)¤t->event_json.peer + : (struct sockaddr *)¤t->event_serv.peer), + &peer_addr_len); + if (current->fd < 0) + { + syslog(LOG_DAEMON | LOG_ERR, "Accept failed: %s", strerror(errno)); + disconnect_client(epollfd, current); + return 1; + } + + switch (current->sock_type) + { + case JSON_SOCK: + current->event_json.json_bytes = 0; + syslog(LOG_DAEMON, "New collector connection"); + break; + case SERV_SOCK: + if (inet_ntop(current->event_serv.peer.sin_family, + ¤t->event_serv.peer.sin_addr, + ¤t->event_serv.peer_addr[0], + sizeof(current->event_serv.peer_addr)) == NULL) + { + if (errno == EAFNOSUPPORT) + { + syslog(LOG_DAEMON | LOG_ERR, "New distributor connection."); + } + else + { + syslog(LOG_DAEMON | LOG_ERR, "Error converting an internet address: %s", strerror(errno)); + } + current->event_serv.peer_addr[0] = '\0'; + } + else + { + syslog(LOG_DAEMON, + "New distributor connection from %.*s:%u", + (int)sizeof(current->event_serv.peer_addr), + current->event_serv.peer_addr, + ntohs(current->event_serv.peer.sin_port)); + } + break; + } + + /* nonblocking fd is mandatory */ + int fd_flags = fcntl(current->fd, F_GETFL, 0); + if (fd_flags == -1 || fcntl(current->fd, F_SETFL, fd_flags | O_NONBLOCK) == -1) + { + syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno)); + disconnect_client(epollfd, current); + return 1; + } + + /* shutdown writing end for collector clients */ + if (current->sock_type == JSON_SOCK) + { + shutdown(current->fd, SHUT_WR); // collector + /* setup epoll event */ + struct epoll_event accept_event = {}; + accept_event.data.ptr = current; + accept_event.events = EPOLLIN; + if (epoll_ctl(epollfd, EPOLL_CTL_ADD, current->fd, &accept_event) < 0) + { + disconnect_client(epollfd, current); + return 1; + } + } + else + { + shutdown(current->fd, SHUT_RD); // distributor + } + + return 0; +} + +static int handle_collector_protocol(int epollfd, struct remote_desc * const current) +{ + char * json_str_start = NULL; + + if (current->buf.ptr[NETWORK_BUFFER_LENGTH_DIGITS] != '{') + { + syslog(LOG_DAEMON | LOG_ERR, + "BUG: JSON invalid opening character: '%c'", + current->buf.ptr[NETWORK_BUFFER_LENGTH_DIGITS]); + disconnect_client(epollfd, current); + return 1; + } + + errno = 0; + current->event_json.json_bytes = strtoull((char *)current->buf.ptr, &json_str_start, 10); + current->event_json.json_bytes += (uint8_t *)json_str_start - current->buf.ptr; + + if (errno == ERANGE) + { + syslog(LOG_DAEMON | LOG_ERR, "BUG: Size of JSON exceeds limit"); + disconnect_client(epollfd, current); + return 1; + } + + if ((uint8_t *)json_str_start == current->buf.ptr) + { + syslog(LOG_DAEMON | LOG_ERR, + "BUG: Missing size before JSON string: \"%.*s\"", + NETWORK_BUFFER_LENGTH_DIGITS, + current->buf.ptr); + disconnect_client(epollfd, current); + return 1; + } + + if (current->event_json.json_bytes > current->buf.max) + { + syslog(LOG_DAEMON | LOG_ERR, + "BUG: JSON string too big: %llu > %zu", + current->event_json.json_bytes, + current->buf.max); + disconnect_client(epollfd, current); + return 1; + } + + if (current->event_json.json_bytes > current->buf.used) + { + return 1; + } + + if (current->buf.ptr[current->event_json.json_bytes - 2] != '}' || + current->buf.ptr[current->event_json.json_bytes - 1] != '\n') + { + syslog(LOG_DAEMON | LOG_ERR, + "BUG: Invalid JSON string: %.*s", + (int)current->event_json.json_bytes, + current->buf.ptr); + disconnect_client(epollfd, current); + return 1; + } + + return 0; +} + +static int handle_incoming_data(int epollfd, struct epoll_event * event) +{ + struct remote_desc * current = (struct remote_desc *)event->data.ptr; + + if (current->fd < 0) + { + syslog(LOG_DAEMON | LOG_ERR, "file descriptor `%d' got from event data invalid", current->fd); + return 1; + } + + if (event->events & EPOLLIN && current->sock_type == JSON_SOCK) + { + /* read JSON strings (or parts) from the UNIX socket (collecting) */ + if (current->buf.used == current->buf.max) + { + syslog(LOG_DAEMON, "Collector read buffer full. No more read possible."); + } + else + { + errno = 0; + ssize_t bytes_read = + read(current->fd, current->buf.ptr + current->buf.used, current->buf.max - current->buf.used); + if (bytes_read < 0 || errno != 0) + { + disconnect_client(epollfd, current); + return 1; + } + if (bytes_read == 0) + { + syslog(LOG_DAEMON, "Collector connection closed during read"); + disconnect_client(epollfd, current); + return 1; + } + current->buf.used += bytes_read; + } + + while (current->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1) + { + if (handle_collector_protocol(epollfd, current) != 0) + { + break; + } + + for (size_t i = 0; i < remotes.desc_size; ++i) + { + if (remotes.desc[i].fd < 0) + { + continue; + } + if (remotes.desc[i].sock_type != SERV_SOCK) + { + continue; + } + if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used) + { + syslog(LOG_DAEMON | LOG_ERR, + "Buffer capacity threshold (%zu of max %zu bytes) reached, " + "falling back to blocking mode.", + remotes.desc[i].buf.used, + remotes.desc[i].buf.max); + /* + * FIXME: Maybe switch to a Multithreading distributor data transmission, + * so that we do not have to switch back to blocking mode here! + * NOTE: If *one* distributer peer is too slow, all other distributors are + * affected by this. This causes starvation and leads to a possible data loss on + * the nDPId collector side. + */ + int fd_flags = fcntl(remotes.desc[i].fd, F_GETFL, 0); + if (fd_flags == -1 || fcntl(remotes.desc[i].fd, F_SETFL, fd_flags & ~O_NONBLOCK) == -1) + { + syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno)); + disconnect_client(epollfd, &remotes.desc[i]); + continue; + } + if (write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used) != + (ssize_t)remotes.desc[i].buf.used) + { + syslog(LOG_DAEMON | LOG_ERR, + "Could not drain buffer by %zu bytes. (forced)", + remotes.desc[i].buf.used); + disconnect_client(epollfd, &remotes.desc[i]); + continue; + } + remotes.desc[i].buf.used = 0; + if (fcntl(remotes.desc[i].fd, F_SETFL, fd_flags) == -1) + { + syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno)); + disconnect_client(epollfd, &remotes.desc[i]); + continue; + } + } + + memcpy(remotes.desc[i].buf.ptr + remotes.desc[i].buf.used, + current->buf.ptr, + current->event_json.json_bytes); + remotes.desc[i].buf.used += current->event_json.json_bytes; + + errno = 0; + ssize_t bytes_written = write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used); + if (errno == EAGAIN) + { + continue; + } + if (bytes_written < 0 || errno != 0) + { + if (remotes.desc[i].event_serv.peer_addr[0] == '\0') + { + syslog(LOG_DAEMON | LOG_ERR, "Distributor connection closed, send failed: %s", strerror(errno)); + } + else + { + syslog(LOG_DAEMON | LOG_ERR, + "Distributor connection to %.*s:%u closed, send failed: %s", + (int)sizeof(remotes.desc[i].event_serv.peer_addr), + remotes.desc[i].event_serv.peer_addr, + ntohs(remotes.desc[i].event_serv.peer.sin_port), + strerror(errno)); + } + disconnect_client(epollfd, &remotes.desc[i]); + continue; + } + if (bytes_written == 0) + { + syslog(LOG_DAEMON, + "Distributor connection to %.*s:%u closed during write", + (int)sizeof(remotes.desc[i].event_serv.peer_addr), + remotes.desc[i].event_serv.peer_addr, + ntohs(remotes.desc[i].event_serv.peer.sin_port)); + disconnect_client(epollfd, &remotes.desc[i]); + continue; + } + if ((size_t)bytes_written < remotes.desc[i].buf.used) + { + syslog(LOG_DAEMON, + "Distributor wrote less than expected to %.*s:%u: %zd < %zu", + (int)sizeof(remotes.desc[i].event_serv.peer_addr), + remotes.desc[i].event_serv.peer_addr, + ntohs(remotes.desc[i].event_serv.peer.sin_port), + bytes_written, + remotes.desc[i].buf.used); + memmove(remotes.desc[i].buf.ptr, + remotes.desc[i].buf.ptr + bytes_written, + remotes.desc[i].buf.used - bytes_written); + remotes.desc[i].buf.used -= bytes_written; + continue; + } + + remotes.desc[i].buf.used = 0; + } + + memmove(current->buf.ptr, + current->buf.ptr + current->event_json.json_bytes, + current->buf.used - current->event_json.json_bytes); + current->buf.used -= current->event_json.json_bytes; + current->event_json.json_bytes = 0; + } + } + + return 0; +} + +static int mainloop(int epollfd) +{ + struct epoll_event events[32]; + size_t const events_size = sizeof(events) / sizeof(events[0]); + + while (main_thread_shutdown == 0) + { + int nready = epoll_wait(epollfd, events, events_size, -1); + + for (int i = 0; i < nready; i++) + { + if (events[i].events & EPOLLERR) + { + syslog(LOG_DAEMON | LOG_ERR, + "Epoll event error: %s", + (errno != 0 ? strerror(errno) : "Client disconnected")); + if (events[i].data.fd != json_sockfd && events[i].data.fd != serv_sockfd) + { + struct remote_desc * current = (struct remote_desc *)events[i].data.ptr; + disconnect_client(epollfd, current); + } + continue; + } + + if (events[i].data.fd == json_sockfd || events[i].data.fd == serv_sockfd) + { + /* New connection to collector / distributor. */ + if (new_connection(epollfd, events[i].data.fd) != 0) + { + continue; + } + } + else + { + /* Incoming data. */ + if (handle_incoming_data(epollfd, &events[i]) != 0) + { + continue; + } + } + } + } + + return 0; +} + int main(int argc, char ** argv) { + int retval = 1; + if (argc == 0) { return 1; @@ -371,331 +735,7 @@ int main(int argc, char ** argv) goto error; } - struct epoll_event events[32]; - size_t const events_size = sizeof(events) / sizeof(events[0]); - while (main_thread_shutdown == 0) - { - struct remote_desc * current = NULL; - int nready = epoll_wait(epollfd, events, events_size, -1); - - for (int i = 0; i < nready; i++) - { - if (events[i].events & EPOLLERR) - { - syslog(LOG_DAEMON | LOG_ERR, - "Epoll event error: %s", - (errno != 0 ? strerror(errno) : "Client disconnected")); - if (events[i].data.fd != json_sockfd && events[i].data.fd != serv_sockfd) - { - current = (struct remote_desc *)events[i].data.ptr; - disconnect_client(epollfd, current); - } - continue; - } - - /* New connection to collector / distributor. */ - if (events[i].data.fd == json_sockfd || events[i].data.fd == serv_sockfd) - { - current = get_unused_remote_descriptor(); - if (current == NULL) - { - syslog(LOG_DAEMON | LOG_ERR, "Max number of connections reached: %zu", remotes.desc_used); - continue; - } - current->type = (events[i].data.fd == json_sockfd ? JSON_SOCK : SERV_SOCK); - - int sockfd = (current->type == JSON_SOCK ? json_sockfd : serv_sockfd); - socklen_t peer_addr_len = - (current->type == JSON_SOCK ? sizeof(current->event_json.peer) : sizeof(current->event_serv.peer)); - - current->fd = accept(sockfd, - (current->type == JSON_SOCK ? (struct sockaddr *)¤t->event_json.peer - : (struct sockaddr *)¤t->event_serv.peer), - &peer_addr_len); - if (current->fd < 0) - { - syslog(LOG_DAEMON | LOG_ERR, "Accept failed: %s", strerror(errno)); - disconnect_client(epollfd, current); - continue; - } - - switch (current->type) - { - case JSON_SOCK: - current->event_json.json_bytes = 0; - syslog(LOG_DAEMON, "New collector connection"); - break; - case SERV_SOCK: - if (inet_ntop(current->event_serv.peer.sin_family, - ¤t->event_serv.peer.sin_addr, - ¤t->event_serv.peer_addr[0], - sizeof(current->event_serv.peer_addr)) == NULL) - { - if (errno == EAFNOSUPPORT) - { - syslog(LOG_DAEMON | LOG_ERR, "New distributor connection."); - } - else - { - syslog(LOG_DAEMON | LOG_ERR, - "Error converting an internet address: %s", - strerror(errno)); - } - current->event_serv.peer_addr[0] = '\0'; - } - else - { - syslog(LOG_DAEMON, - "New distributor connection from %.*s:%u", - (int)sizeof(current->event_serv.peer_addr), - current->event_serv.peer_addr, - ntohs(current->event_serv.peer.sin_port)); - } - break; - } - - /* nonblocking fd is mandatory */ - int fd_flags = fcntl(current->fd, F_GETFL, 0); - if (fd_flags == -1 || fcntl(current->fd, F_SETFL, fd_flags | O_NONBLOCK) == -1) - { - syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno)); - disconnect_client(epollfd, current); - continue; - } - - /* shutdown writing end for collector clients */ - if (current->type == JSON_SOCK) - { - shutdown(current->fd, SHUT_WR); // collector - /* setup epoll event */ - struct epoll_event accept_event = {}; - accept_event.data.ptr = current; - accept_event.events = EPOLLIN; - if (epoll_ctl(epollfd, EPOLL_CTL_ADD, current->fd, &accept_event) < 0) - { - disconnect_client(epollfd, current); - continue; - } - } - else - { - shutdown(current->fd, SHUT_RD); // distributor - } - } - else - { - current = (struct remote_desc *)events[i].data.ptr; - - if (current->fd < 0) - { - syslog(LOG_DAEMON | LOG_ERR, "file descriptor `%d' got from event data invalid", current->fd); - continue; - } - - if (events[i].events & EPOLLIN && current->type == JSON_SOCK) - { - /* read JSON strings (or parts) from the UNIX socket (collecting) */ - if (current->buf.used == current->buf.max) - { - syslog(LOG_DAEMON, "Collector read buffer full. No more read possible."); - } - else - { - errno = 0; - ssize_t bytes_read = read(current->fd, - current->buf.ptr + current->buf.used, - current->buf.max - current->buf.used); - if (bytes_read < 0 || errno != 0) - { - disconnect_client(epollfd, current); - continue; - } - if (bytes_read == 0) - { - syslog(LOG_DAEMON, "Collector connection closed during read"); - disconnect_client(epollfd, current); - continue; - } - current->buf.used += bytes_read; - } - - while (current->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1) - { - if (current->buf.ptr[NETWORK_BUFFER_LENGTH_DIGITS] != '{') - { - syslog(LOG_DAEMON | LOG_ERR, - "BUG: JSON invalid opening character: '%c'", - current->buf.ptr[NETWORK_BUFFER_LENGTH_DIGITS]); - disconnect_client(epollfd, current); - break; - } - - errno = 0; - char * json_str_start = NULL; - current->event_json.json_bytes = strtoull((char *)current->buf.ptr, &json_str_start, 10); - current->event_json.json_bytes += (uint8_t *)json_str_start - current->buf.ptr; - - if (errno == ERANGE) - { - syslog(LOG_DAEMON | LOG_ERR, "BUG: Size of JSON exceeds limit"); - disconnect_client(epollfd, current); - break; - } - if ((uint8_t *)json_str_start == current->buf.ptr) - { - syslog(LOG_DAEMON | LOG_ERR, - "BUG: Missing size before JSON string: \"%.*s\"", - NETWORK_BUFFER_LENGTH_DIGITS, - current->buf.ptr); - disconnect_client(epollfd, current); - break; - } - if (current->event_json.json_bytes > current->buf.max) - { - syslog(LOG_DAEMON | LOG_ERR, - "BUG: JSON string too big: %llu > %zu", - current->event_json.json_bytes, - current->buf.max); - disconnect_client(epollfd, current); - break; - } - if (current->event_json.json_bytes > current->buf.used) - { - break; - } - - if (current->buf.ptr[current->event_json.json_bytes - 2] != '}' || - current->buf.ptr[current->event_json.json_bytes - 1] != '\n') - { - syslog(LOG_DAEMON | LOG_ERR, - "BUG: Invalid JSON string: %.*s", - (int)current->event_json.json_bytes, - current->buf.ptr); - disconnect_client(epollfd, current); - break; - } - - for (size_t i = 0; i < remotes.desc_size; ++i) - { - if (remotes.desc[i].fd < 0) - { - continue; - } - if (remotes.desc[i].type != SERV_SOCK) - { - continue; - } - if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used) - { - syslog(LOG_DAEMON | LOG_ERR, - "Buffer capacity threshold (%zu of max %zu bytes) reached, " - "falling back to blocking mode.", - remotes.desc[i].buf.used, - remotes.desc[i].buf.max); - /* - * FIXME: Maybe switch to a Multithreading distributor data transmission, - * so that we do not have to switch back to blocking mode here! - * NOTE: If *one* distributer peer is too slow, all other distributors are - * affected by this. This causes starvation and leads to a possible data loss on - * the nDPId collector side. - */ - int fd_flags = fcntl(remotes.desc[i].fd, F_GETFL, 0); - if (fd_flags == -1 || fcntl(remotes.desc[i].fd, F_SETFL, fd_flags & ~O_NONBLOCK) == -1) - { - syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno)); - disconnect_client(epollfd, &remotes.desc[i]); - continue; - } - if (write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used) != - (ssize_t)remotes.desc[i].buf.used) - { - syslog(LOG_DAEMON | LOG_ERR, - "Could not drain buffer by %zu bytes. (forced)", - remotes.desc[i].buf.used); - disconnect_client(epollfd, &remotes.desc[i]); - continue; - } - remotes.desc[i].buf.used = 0; - if (fcntl(remotes.desc[i].fd, F_SETFL, fd_flags) == -1) - { - syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno)); - disconnect_client(epollfd, &remotes.desc[i]); - continue; - } - } - - memcpy(remotes.desc[i].buf.ptr + remotes.desc[i].buf.used, - current->buf.ptr, - current->event_json.json_bytes); - remotes.desc[i].buf.used += current->event_json.json_bytes; - - errno = 0; - ssize_t bytes_written = - write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used); - if (errno == EAGAIN) - { - continue; - } - if (bytes_written < 0 || errno != 0) - { - if (remotes.desc[i].event_serv.peer_addr[0] == '\0') - { - syslog(LOG_DAEMON | LOG_ERR, - "Distributor connection closed, send failed: %s", - strerror(errno)); - } - else - { - syslog(LOG_DAEMON | LOG_ERR, - "Distributor connection to %.*s:%u closed, send failed: %s", - (int)sizeof(remotes.desc[i].event_serv.peer_addr), - remotes.desc[i].event_serv.peer_addr, - ntohs(remotes.desc[i].event_serv.peer.sin_port), - strerror(errno)); - } - disconnect_client(epollfd, &remotes.desc[i]); - continue; - } - if (bytes_written == 0) - { - syslog(LOG_DAEMON, - "Distributor connection to %.*s:%u closed during write", - (int)sizeof(remotes.desc[i].event_serv.peer_addr), - remotes.desc[i].event_serv.peer_addr, - ntohs(remotes.desc[i].event_serv.peer.sin_port)); - disconnect_client(epollfd, &remotes.desc[i]); - continue; - } - if ((size_t)bytes_written < remotes.desc[i].buf.used) - { - syslog(LOG_DAEMON, - "Distributor wrote less than expected to %.*s:%u: %zd < %zu", - (int)sizeof(remotes.desc[i].event_serv.peer_addr), - remotes.desc[i].event_serv.peer_addr, - ntohs(remotes.desc[i].event_serv.peer.sin_port), - bytes_written, - remotes.desc[i].buf.used); - memmove(remotes.desc[i].buf.ptr, - remotes.desc[i].buf.ptr + bytes_written, - remotes.desc[i].buf.used - bytes_written); - remotes.desc[i].buf.used -= bytes_written; - continue; - } - - remotes.desc[i].buf.used = 0; - } - - memmove(current->buf.ptr, - current->buf.ptr + current->event_json.json_bytes, - current->buf.used - current->event_json.json_bytes); - current->buf.used -= current->event_json.json_bytes; - current->event_json.json_bytes = 0; - } - } - } - } - } - + retval = mainloop(epollfd); error: close(json_sockfd); close(serv_sockfd); @@ -707,5 +747,5 @@ error: unlink(json_sockpath); unlink(serv_optarg); - return 0; + return retval; } |