diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2022-03-10 13:51:21 +0100 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2022-03-10 14:26:07 +0100 |
commit | 41757ecf1cbcbcd890c1ab7e08995aaffe031752 (patch) | |
tree | 0421cdcc7573aa40fc1e198f16f02913e5e7bcd0 /nDPIsrvd.c | |
parent | 6f1f9e65ea86bba7c944b183e7d413a14f71852d (diff) |
Added nDPIsrvd TCP/IP support for distributors.
* nDPIsrvd: Improved distributor client disconnect detection
* nDPIsrvd: Fixed invalid usage of epoll_add instead of epoll_mod
* nPDIsrvd: Improved logging for distributor clients
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r-- | nDPIsrvd.c | 633 |
1 files changed, 404 insertions, 229 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c index 791901f78..454e4a85f 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -20,8 +20,9 @@ enum sock_type { - JSON_SOCK, - SERV_SOCK + COLLECTOR_UN, + DISTRIBUTOR_UN, + DISTRIBUTOR_IN, }; struct remote_desc @@ -34,16 +35,21 @@ struct remote_desc { struct { - int json_sockfd; + int collector_sockfd; struct sockaddr_un peer; unsigned long long int json_bytes; - } event_json; + } event_collector_un; struct { - int serv_sockfd; + int distributor_sockfd; + struct sockaddr_un peer; + } event_distributor_un; /* UNIX socket */ + struct + { + int distributor_sockfd; struct sockaddr_in peer; char peer_addr[INET_ADDRSTRLEN]; - } event_serv; + } event_distributor_in; /* TCP/IP socket */ }; }; @@ -55,17 +61,19 @@ static struct } remotes = {NULL, 0, 0}; static int nDPIsrvd_main_thread_shutdown = 0; -static int json_sockfd; -static int serv_sockfd; -static struct nDPIsrvd_address serv_address = { +static int collector_un_sockfd = -1; +static int distributor_un_sockfd = -1; +static int distributor_in_sockfd = -1; +static struct nDPIsrvd_address distributor_in_address = { .raw.sa_family = 0xFFFF, }; static struct { char * pidfile; - char * json_sockpath; - char * serv_optarg; + char * collector_un_sockpath; + char * distributor_un_sockpath; + char * distributor_in_address; char * user; char * group; nDPIsrvd_ull cache_array_length; @@ -74,10 +82,10 @@ static struct static int fcntl_add_flags(int fd, int flags); static int fcntl_del_flags(int fd, int flags); +static int add_in_event_fd(int epollfd, int fd); +static int add_in_event(int epollfd, struct remote_desc * const remote); +static int del_out_event(int epollfd, struct remote_desc * const remote); static void disconnect_client(int epollfd, struct remote_desc * const current); -static int add_in_event(int epollfd, int fd, void * ptr); -static int add_out_event(int epollfd, int fd, void * ptr); -static int del_event(int epollfd, int fd); static int drain_cache_blocking(struct remote_desc * const remote); static void nDPIsrvd_buffer_array_copy(void * dst, const void * src) @@ -154,6 +162,39 @@ static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf, return 0; } +static void logger_distributor(struct remote_desc * const remote, + char const * const prefix, + char const * const format, + ...) +{ + char logbuf[512]; + va_list ap; + + va_start(ap, format); + vsnprintf(logbuf, sizeof(logbuf), format, ap); + + if (remote->sock_type == DISTRIBUTOR_UN) + { + logger(1, "%s %s", prefix, logbuf); + } + else if (remote->sock_type == DISTRIBUTOR_IN) + { + logger(1, + "%s %.*s:%u %s", + prefix, + (int)sizeof(remote->event_distributor_in.peer_addr), + remote->event_distributor_in.peer_addr, + ntohs(remote->event_distributor_in.peer.sin_port), + logbuf); + } + else + { + logger(1, "BUG: Distributor logging interface called with an collector/invalid remote"); + } + + va_end(ap); +} + static int drain_main_buffer(struct remote_desc * const remote) { if (remote->buf.used == 0) @@ -169,46 +210,19 @@ static int drain_main_buffer(struct remote_desc * const remote) } if (bytes_written < 0 || errno != 0) { - if (remote->event_serv.peer_addr[0] == '\0') - { - logger(1, "Distributor connection closed, send failed: %s", strerror(errno)); - } - else - { - logger(1, - "Distributor connection to %.*s:%u closed, send failed: %s", - (int)sizeof(remote->event_serv.peer_addr), - remote->event_serv.peer_addr, - ntohs(remote->event_serv.peer.sin_port), - strerror(errno)); - } + logger_distributor(remote, "Distributor connection", "%d closed, send failed: %s", remote->fd, strerror(errno)); return -1; } if (bytes_written == 0) { - if (remote->event_serv.peer_addr[0] == '\0') - { - logger(0, "%s", "Distributor connection closed during write"); - } - else - { - logger(0, - "Distributor connection to %.*s:%u closed during write", - (int)sizeof(remote->event_serv.peer_addr), - remote->event_serv.peer_addr, - ntohs(remote->event_serv.peer.sin_port)); - } + logger_distributor(remote, "Distributor connection", "%d closed", remote->fd); return -1; } if ((size_t)bytes_written < remote->buf.used) { #if 0 - logger(0, "Distributor wrote less than expected to %.*s:%u: %zd < %zu", - (int)sizeof(remote->event_serv.peer_addr), - remote->event_serv.peer_addr, - ntohs(remote->event_serv.peer.sin_port), - bytes_written, - remote->buf.used); + logger_distributor( + remote, "Distributor", "wrote less than expected: %zd < %zu", bytes_written, remote->buf.used); #endif memmove(remote->buf.ptr.raw, remote->buf.ptr.raw + bytes_written, remote->buf.used - bytes_written); } @@ -278,7 +292,7 @@ static int drain_cache_blocking(struct remote_desc * const remote) static int handle_outgoing_data(int epollfd, struct remote_desc * const remote) { - if (remote->sock_type != SERV_SOCK) + if (remote->sock_type != DISTRIBUTOR_UN && remote->sock_type != DISTRIBUTOR_IN) { return -1; } @@ -289,7 +303,7 @@ static int handle_outgoing_data(int epollfd, struct remote_desc * const remote) } if (utarray_len(remote->buf_cache) == 0) { - del_event(epollfd, remote->fd); + return del_out_event(epollfd, remote); } return 0; @@ -321,73 +335,150 @@ static int fcntl_del_flags(int fd, int flags) static int create_listen_sockets(void) { - json_sockfd = socket(AF_UNIX, SOCK_STREAM, 0); - serv_sockfd = socket(serv_address.raw.sa_family, SOCK_STREAM, 0); - if (json_sockfd < 0 || serv_sockfd < 0) + collector_un_sockfd = socket(AF_UNIX, SOCK_STREAM, 0); + distributor_un_sockfd = socket(AF_UNIX, SOCK_STREAM, 0); + if (collector_un_sockfd < 0 || distributor_un_sockfd < 0) { - logger(1, "Error opening socket: %s", strerror(errno)); + logger(1, "Error creating UNIX socket: %s", strerror(errno)); return 1; } - int opt = 1; - if (setsockopt(json_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0 || - setsockopt(serv_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) + if (nDPIsrvd_options.distributor_in_address != NULL) { - logger(1, "setsockopt with SO_REUSEADDR failed: %s", strerror(errno)); + distributor_in_sockfd = socket(distributor_in_address.raw.sa_family, SOCK_STREAM, 0); + if (distributor_in_sockfd < 0) + { + logger(1, "Error creating TCP/IP socket: %s", strerror(errno)); + return 1; + } + int opt = 1; + if (setsockopt(distributor_in_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) + { + logger(1, "TCP/IP socket setsockopt(SO_REUSEADDR) failed: %s", strerror(errno)); + } } - struct sockaddr_un json_addr; - json_addr.sun_family = AF_UNIX; - int written = snprintf(json_addr.sun_path, sizeof(json_addr.sun_path), "%s", nDPIsrvd_options.json_sockpath); - if (written < 0) { - logger(1, "snprintf failed: %s", strerror(errno)); - return 1; + int opt = 1; + if (setsockopt(collector_un_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0 || + setsockopt(distributor_un_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) + { + logger(1, "UNIX socket setsockopt(SO_REUSEADDR) failed: %s", strerror(errno)); + } } - else if (written == sizeof(json_addr.sun_path)) + { - logger(1, - "JSON socket path too long, current/max: %zu/%zu", - strlen(nDPIsrvd_options.json_sockpath), - sizeof(json_addr.sun_path) - 1); - return 1; + struct sockaddr_un collector_addr; + collector_addr.sun_family = AF_UNIX; + int written = snprintf(collector_addr.sun_path, + sizeof(collector_addr.sun_path), + "%s", + nDPIsrvd_options.collector_un_sockpath); + if (written < 0) + { + logger(1, "snprintf failed: %s", strerror(errno)); + return 1; + } + else if (written == sizeof(collector_addr.sun_path)) + { + logger(1, + "Collector UNIX socket path too long, current/max: %zu/%zu", + strlen(nDPIsrvd_options.collector_un_sockpath), + sizeof(collector_addr.sun_path) - 1); + return 1; + } + + if (bind(collector_un_sockfd, (struct sockaddr *)&collector_addr, sizeof(collector_addr)) < 0) + { + logger(1, + "Error binding Collector UNIX socket to `%s': %s", + nDPIsrvd_options.collector_un_sockpath, + strerror(errno)); + return 1; + } } - if (bind(json_sockfd, (struct sockaddr *)&json_addr, sizeof(json_addr)) < 0) { - unlink(nDPIsrvd_options.json_sockpath); - logger(1, - "Error on binding UNIX socket (collector) to %s: %s", - nDPIsrvd_options.json_sockpath, - strerror(errno)); - return 1; + struct sockaddr_un distributor_addr; + distributor_addr.sun_family = AF_UNIX; + int written = snprintf(distributor_addr.sun_path, + sizeof(distributor_addr.sun_path), + "%s", + nDPIsrvd_options.distributor_un_sockpath); + if (written < 0) + { + logger(1, "snprintf failed: %s", strerror(errno)); + return 2; + } + else if (written == sizeof(distributor_addr.sun_path)) + { + logger(1, + "Distributor UNIX socket path too long, current/max: %zu/%zu", + strlen(nDPIsrvd_options.distributor_un_sockpath), + sizeof(distributor_addr.sun_path) - 1); + return 2; + } + + if (bind(distributor_un_sockfd, (struct sockaddr *)&distributor_addr, sizeof(distributor_addr)) < 0) + { + logger(1, + "Error binding Distributor socket to `%s': %s", + nDPIsrvd_options.distributor_un_sockpath, + strerror(errno)); + return 2; + } } - if (bind(serv_sockfd, &serv_address.raw, serv_address.size) < 0) + if (nDPIsrvd_options.distributor_in_address != NULL) { - logger(1, "Error on binding socket (distributor) to %s: %s", nDPIsrvd_options.serv_optarg, strerror(errno)); - unlink(nDPIsrvd_options.json_sockpath); - return 1; + if (bind(distributor_in_sockfd, &distributor_in_address.raw, distributor_in_address.size) < 0) + { + logger(1, + "Error binding Distributor TCP/IP socket to %s: %s", + nDPIsrvd_options.distributor_in_address, + strerror(errno)); + return 3; + } + if (listen(distributor_in_sockfd, 16) < 0) + { + logger(1, + "Error listening Distributor TCP/IP socket to %s: %s", + nDPIsrvd_options.distributor_in_address, + strerror(errno)); + return 3; + } + if (fcntl_add_flags(distributor_in_sockfd, O_NONBLOCK) != 0) + { + logger(1, + "Error setting Distributor TCP/IP socket %s to non-blocking mode: %s", + nDPIsrvd_options.distributor_in_address, + strerror(errno)); + return 3; + } } - if (listen(json_sockfd, 16) < 0 || listen(serv_sockfd, 16) < 0) + if (listen(collector_un_sockfd, 16) < 0 || listen(distributor_un_sockfd, 16) < 0) { - unlink(nDPIsrvd_options.json_sockpath); - logger(1, "Error on listen: %s", strerror(errno)); - return 1; + logger(1, "Error listening UNIX socket: %s", strerror(errno)); + return 3; } - if (fcntl_add_flags(json_sockfd, O_NONBLOCK) != 0) + if (fcntl_add_flags(collector_un_sockfd, O_NONBLOCK) != 0) { - unlink(nDPIsrvd_options.json_sockpath); - logger(1, "Error setting fd flags for the collector socket: %s", strerror(errno)); - return 1; + logger(1, + "Error setting Collector UNIX socket `%s' to non-blocking mode: %s", + nDPIsrvd_options.collector_un_sockpath, + strerror(errno)); + return 3; } - if (fcntl_add_flags(serv_sockfd, O_NONBLOCK) != 0) + if (fcntl_add_flags(distributor_un_sockfd, O_NONBLOCK) != 0) { - logger(1, "Error setting fd flags for the distributor socket: %s", strerror(errno)); - return 1; + logger(1, + "Error setting Distributor UNIX socket `%s' to non-blocking mode: %s", + nDPIsrvd_options.distributor_un_sockpath, + strerror(errno)); + return 3; } return 0; @@ -405,7 +496,10 @@ static struct remote_desc * get_unused_remote_descriptor(enum sock_type type, in if (remotes.desc[i].fd == -1) { remotes.desc_used++; - utarray_new(remotes.desc[i].buf_cache, &nDPIsrvd_buffer_array_icd); + if (remotes.desc[i].buf_cache == NULL) + { + utarray_new(remotes.desc[i].buf_cache, &nDPIsrvd_buffer_array_icd); + } if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, max_buffer_size) != 0 || remotes.desc[i].buf_cache == NULL) { return NULL; @@ -449,14 +543,34 @@ static int add_event(int epollfd, int events, int fd, void * ptr) return epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); } -static int add_in_event(int epollfd, int fd, void * ptr) +static int add_in_event_fd(int epollfd, int fd) +{ + return add_event(epollfd, EPOLLIN, fd, NULL); +} + +static int add_in_event(int epollfd, struct remote_desc * const remote) +{ + return add_event(epollfd, EPOLLIN, remote->fd, remote); +} + +static int mod_event(int epollfd, int events, int fd, void * ptr) +{ + struct epoll_event event = {}; + + event.data.ptr = ptr; + event.events = events; + + return epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event); +} + +static int add_out_event(int epollfd, struct remote_desc * const remote) { - return add_event(epollfd, EPOLLIN, fd, ptr); + return mod_event(epollfd, EPOLLIN | EPOLLOUT, remote->fd, remote); } -static int add_out_event(int epollfd, int fd, void * ptr) +static int del_out_event(int epollfd, struct remote_desc * const remote) { - return add_event(epollfd, EPOLLOUT, fd, ptr); + return mod_event(epollfd, EPOLLIN, remote->fd, remote); } static int del_event(int epollfd, int fd) @@ -487,7 +601,7 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) { int opt; - while ((opt = getopt(argc, argv, "lL:c:dp:s:u:g:C:Dvh")) != -1) + while ((opt = getopt(argc, argv, "lL:c:dp:s:S:u:g:C:Dvh")) != -1) { switch (opt) { @@ -501,8 +615,8 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) } break; case 'c': - free(nDPIsrvd_options.json_sockpath); - nDPIsrvd_options.json_sockpath = strdup(optarg); + free(nDPIsrvd_options.collector_un_sockpath); + nDPIsrvd_options.collector_un_sockpath = strdup(optarg); break; case 'd': daemonize_enable(); @@ -512,8 +626,12 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) nDPIsrvd_options.pidfile = strdup(optarg); break; case 's': - free(nDPIsrvd_options.serv_optarg); - nDPIsrvd_options.serv_optarg = strdup(optarg); + free(nDPIsrvd_options.distributor_un_sockpath); + nDPIsrvd_options.distributor_un_sockpath = strdup(optarg); + break; + case 'S': + free(nDPIsrvd_options.distributor_in_address); + nDPIsrvd_options.distributor_in_address = strdup(optarg); break; case 'u': free(nDPIsrvd_options.user); @@ -541,7 +659,8 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) fprintf(stderr, "%s\n", get_nDPId_version()); fprintf(stderr, "Usage: %s [-l] [-L logfile] [-c path-to-unix-sock] [-d] [-p pidfile]\n" - "\t[-s path-to-unix-socket|distributor-host:port] [-u user] [-g group]\n" + "\t[-s path-to-distributor-unix-socket] [-S distributor-host:port]\n" + "\t[-u user] [-g group]\n" "\t[-C max-buffered-collector-json-lines] [-D]\n" "\t[-v] [-h]\n", argv[0]); @@ -558,33 +677,45 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) return 1; } - if (nDPIsrvd_options.json_sockpath == NULL) + if (nDPIsrvd_options.collector_un_sockpath == NULL) { - nDPIsrvd_options.json_sockpath = strdup(COLLECTOR_UNIX_SOCKET); + nDPIsrvd_options.collector_un_sockpath = strdup(COLLECTOR_UNIX_SOCKET); } - if (is_path_absolute("JSON socket", nDPIsrvd_options.json_sockpath) != 0) + if (is_path_absolute("Collector UNIX socket", nDPIsrvd_options.collector_un_sockpath) != 0) { return 1; } - if (nDPIsrvd_options.serv_optarg == NULL) + if (nDPIsrvd_options.distributor_un_sockpath == NULL) { - nDPIsrvd_options.serv_optarg = strdup(DISTRIBUTOR_UNIX_SOCKET); + nDPIsrvd_options.distributor_un_sockpath = strdup(DISTRIBUTOR_UNIX_SOCKET); } - - if (nDPIsrvd_setup_address(&serv_address, nDPIsrvd_options.serv_optarg) != 0) + if (is_path_absolute("Distributor UNIX socket", nDPIsrvd_options.distributor_un_sockpath) != 0) { - logger_early(1, "%s: Could not parse address `%s'\n", argv[0], nDPIsrvd_options.serv_optarg); return 1; } - if (serv_address.raw.sa_family == AF_UNIX && is_path_absolute("SERV socket", nDPIsrvd_options.serv_optarg) != 0) + + if (nDPIsrvd_options.distributor_in_address != NULL) { - return 1; + if (nDPIsrvd_setup_address(&distributor_in_address, nDPIsrvd_options.distributor_in_address) != 0) + { + logger_early(1, "%s: Could not parse address %s", argv[0], nDPIsrvd_options.distributor_in_address); + return 1; + } + if (distributor_in_address.raw.sa_family == AF_UNIX) + { + logger_early(1, + "%s: You've requested to setup another UNIX socket `%s', but there is already one at `%s'", + argv[0], + nDPIsrvd_options.distributor_in_address, + nDPIsrvd_options.distributor_un_sockpath); + return 1; + } } if (optind < argc) { - logger_early(1, "%s: Unexpected argument after options\n", argv[0]); + logger_early(1, "%s: Unexpected argument after options", argv[0]); return 1; } @@ -617,24 +748,31 @@ static int new_connection(int epollfd, int eventfd) { union { - struct sockaddr_un event_json; - struct sockaddr_un event_serv; + struct sockaddr_un saddr_collector_un; + struct sockaddr_un saddr_distributor_un; + struct sockaddr_in saddr_distributor_in; } sockaddr; socklen_t peer_addr_len; enum sock_type stype; int server_fd; - if (eventfd == json_sockfd) + if (eventfd == collector_un_sockfd) { - peer_addr_len = sizeof(sockaddr.event_json); - stype = JSON_SOCK; - server_fd = json_sockfd; + peer_addr_len = sizeof(sockaddr.saddr_collector_un); + stype = COLLECTOR_UN; + server_fd = collector_un_sockfd; } - else if (eventfd == serv_sockfd) + else if (eventfd == distributor_un_sockfd) { - peer_addr_len = sizeof(sockaddr.event_serv); - stype = SERV_SOCK; - server_fd = serv_sockfd; + peer_addr_len = sizeof(sockaddr.saddr_distributor_un); + stype = DISTRIBUTOR_UN; + server_fd = distributor_un_sockfd; + } + else if (eventfd == distributor_in_sockfd) + { + peer_addr_len = sizeof(sockaddr.saddr_distributor_in); + stype = DISTRIBUTOR_IN; + server_fd = distributor_in_sockfd; } else { @@ -647,63 +785,65 @@ static int new_connection(int epollfd, int eventfd) return 1; } - char const * sock_type = NULL; - int sockopt = NETWORK_BUFFER_MAX_SIZE; + int sockopt; switch (current->sock_type) { - case JSON_SOCK: - sock_type = "collector"; - current->event_json.json_bytes = 0; - logger(0, "New collector connection"); + case COLLECTOR_UN: + current->event_collector_un.peer = sockaddr.saddr_collector_un; + current->event_collector_un.json_bytes = 0; + logger(1, "New collector connection"); + sockopt = NETWORK_BUFFER_MAX_SIZE; if (setsockopt(current->fd, SOL_SOCKET, SO_RCVBUF, &sockopt, sizeof(sockopt)) < 0) { logger(1, "Error setting socket option SO_RCVBUF: %s", strerror(errno)); return 1; } break; - case SERV_SOCK: - sock_type = "distributor"; + case DISTRIBUTOR_UN: + case DISTRIBUTOR_IN: + if (current->sock_type == DISTRIBUTOR_UN) + { + current->event_distributor_un.peer = sockaddr.saddr_distributor_un; + } + else + { + current->event_distributor_in.peer = sockaddr.saddr_distributor_in; + sockopt = 1; + if (setsockopt(current->fd, SOL_SOCKET, SO_RCVBUF, &sockopt, sizeof(sockopt)) < 0) + { + logger(1, "Error setting socket option SO_RCVBUF: %s", strerror(errno)); + return 1; + } + } + + sockopt = NETWORK_BUFFER_MAX_SIZE; if (setsockopt(current->fd, SOL_SOCKET, SO_SNDBUF, &sockopt, sizeof(sockopt)) < 0) { logger(1, "Error setting socket option SO_SNDBUF: %s", strerror(errno)); return 1; } - if (inet_ntop(current->event_serv.peer.sin_family, - ¤t->event_serv.peer.sin_addr, - ¤t->event_serv.peer_addr[0], - sizeof(current->event_serv.peer_addr)) == NULL) + if (inet_ntop(current->event_distributor_in.peer.sin_family, + ¤t->event_distributor_in.peer.sin_addr, + ¤t->event_distributor_in.peer_addr[0], + sizeof(current->event_distributor_in.peer_addr)) == NULL) { - if (errno == EAFNOSUPPORT) - { - logger(0, "%s", "New distributor connection."); - } - else + if (errno != EAFNOSUPPORT) { logger(1, "Error converting an internet address: %s", strerror(errno)); } - current->event_serv.peer_addr[0] = '\0'; - } - else - { - logger(0, - "New distributor connection from %.*s:%u", - (int)sizeof(current->event_serv.peer_addr), - current->event_serv.peer_addr, - ntohs(current->event_serv.peer.sin_port)); + current->event_distributor_in.peer_addr[0] = '\0'; } + logger_distributor(current, "New distributor connection from", "%d", current->fd); + { struct timeval send_timeout = {1, 0}; if (setsockopt(current->fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&send_timeout, sizeof(send_timeout)) != 0) { logger(1, "Error setting socket option send timeout: %s", strerror(errno)); } - if (setsockopt(current->fd, SOL_SOCKET, SO_SNDBUF, &sockopt, sizeof(sockopt)) < 0) - { - logger(1, "Error setting socket option SO_SNDBUF: %s", strerror(errno)); - } } break; } @@ -711,25 +851,24 @@ static int new_connection(int epollfd, int eventfd) /* nonblocking fd is mandatory */ if (fcntl_add_flags(current->fd, O_NONBLOCK) != 0) { - logger(1, "Error setting %s fd flags: %s", sock_type, strerror(errno)); + logger(1, "Error setting fd flags: %s", strerror(errno)); disconnect_client(epollfd, current); return 1; } /* shutdown writing end for collector clients */ - if (current->sock_type == JSON_SOCK) + if (current->sock_type == COLLECTOR_UN) { shutdown(current->fd, SHUT_WR); // collector - /* setup epoll event */ - if (add_in_event(epollfd, current->fd, current) != 0) - { - disconnect_client(epollfd, current); - return 1; - } + /* shutdown reading end for distributor clients does not work due to epoll usage */ } - else + + /* setup epoll event */ + if (add_in_event(epollfd, current) != 0) { - shutdown(current->fd, SHUT_RD); // distributor + logger(1, "Error adding input event to %d: %s", current->fd, strerror(errno)); + disconnect_client(epollfd, current); + return 1; } return 0; @@ -747,8 +886,8 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur } errno = 0; - current->event_json.json_bytes = strtoull((char *)current->buf.ptr.text, &json_str_start, 10); - current->event_json.json_bytes += json_str_start - current->buf.ptr.text; + current->event_collector_un.json_bytes = strtoull((char *)current->buf.ptr.text, &json_str_start, 10); + current->event_collector_un.json_bytes += json_str_start - current->buf.ptr.text; if (errno == ERANGE) { @@ -776,22 +915,22 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur (long int)(json_str_start - current->buf.ptr.text)); } - if (current->event_json.json_bytes > current->buf.max) + if (current->event_collector_un.json_bytes > current->buf.max) { - logger(1, "BUG: JSON string too big: %llu > %zu", current->event_json.json_bytes, current->buf.max); + logger(1, "BUG: JSON string too big: %llu > %zu", current->event_collector_un.json_bytes, current->buf.max); disconnect_client(epollfd, current); return 1; } - if (current->event_json.json_bytes > current->buf.used) + if (current->event_collector_un.json_bytes > current->buf.used) { return 1; } - if (current->buf.ptr.text[current->event_json.json_bytes - 2] != '}' || - current->buf.ptr.text[current->event_json.json_bytes - 1] != '\n') + if (current->buf.ptr.text[current->event_collector_un.json_bytes - 2] != '}' || + current->buf.ptr.text[current->event_collector_un.json_bytes - 1] != '\n') { - logger(1, "BUG: Invalid JSON string: %.*s", (int)current->event_json.json_bytes, current->buf.ptr.text); + logger(1, "BUG: Invalid JSON string: %.*s", (int)current->event_collector_un.json_bytes, current->buf.ptr.text); disconnect_client(epollfd, current); return 1; } @@ -801,8 +940,19 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur static int handle_incoming_data(int epollfd, struct remote_desc * const current) { - if (current->sock_type != JSON_SOCK) + if (current->sock_type != COLLECTOR_UN) { + unsigned char garbage = 0; + + if (read(current->fd, &garbage, sizeof(garbage)) == sizeof(garbage)) + { + logger_distributor(current, "Received data from", "%d which is not allowed to send us some.", current->fd); + } + else + { + logger_distributor(current, "Distributor connection", "%d closed", current->fd); + } + disconnect_client(epollfd, current); return 1; } @@ -818,6 +968,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) read(current->fd, current->buf.ptr.raw + current->buf.used, current->buf.max - current->buf.used); if (bytes_read < 0 || errno != 0) { + logger(1, "Could not read remote: %s", strerror(errno)); disconnect_client(epollfd, current); return 1; } @@ -843,28 +994,31 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) { continue; } - if (remotes.desc[i].sock_type != SERV_SOCK) + if (remotes.desc[i].sock_type != DISTRIBUTOR_UN && remotes.desc[i].sock_type != DISTRIBUTOR_IN) { continue; } - if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used || + + if (current->event_collector_un.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used || utarray_len(remotes.desc[i].buf_cache) > 0) { if (utarray_len(remotes.desc[i].buf_cache) == 0) { #if 0 - logger(0, "Buffer capacity threshold (%zu bytes) reached, caching JSON strings.", remotes.desc[i].buf.used); + logger_distributor(&remotes.desc[i], + "Distributor", + "buffer capacity threshold (%zu bytes) reached, caching JSON strings.", + remotes.desc[i].buf.used); #endif errno = 0; - if (add_out_event(epollfd, remotes.desc[i].fd, &remotes.desc[i]) != 0 && - errno != EEXIST /* required for nDPId-test */) + if (add_out_event(epollfd, &remotes.desc[i]) != 0) { logger(1, "%s: %s", "Could not add event, disconnecting", strerror(errno)); disconnect_client(epollfd, &remotes.desc[i]); continue; } } - if (add_to_cache(&remotes.desc[i], current->buf.ptr.raw, current->event_json.json_bytes) != 0) + if (add_to_cache(&remotes.desc[i], current->buf.ptr.raw, current->event_collector_un.json_bytes) != 0) { disconnect_client(epollfd, &remotes.desc[i]); continue; @@ -874,8 +1028,8 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) { memcpy(remotes.desc[i].buf.ptr.raw + remotes.desc[i].buf.used, current->buf.ptr.raw, - current->event_json.json_bytes); - remotes.desc[i].buf.used += current->event_json.json_bytes; + current->event_collector_un.json_bytes); + remotes.desc[i].buf.used += current->event_collector_un.json_bytes; } if (drain_main_buffer(&remotes.desc[i]) != 0) @@ -885,10 +1039,10 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) } memmove(current->buf.ptr.raw, - current->buf.ptr.raw + current->event_json.json_bytes, - current->buf.used - current->event_json.json_bytes); - current->buf.used -= current->event_json.json_bytes; - current->event_json.json_bytes = 0; + current->buf.ptr.raw + current->event_collector_un.json_bytes, + current->buf.used - current->event_collector_un.json_bytes); + current->buf.used -= current->event_collector_un.json_bytes; + current->event_collector_un.json_bytes = 0; } return 0; @@ -946,7 +1100,7 @@ static int setup_signalfd(int epollfd) return -1; } - if (add_in_event(epollfd, sfd, NULL) != 0) + if (add_in_event_fd(epollfd, sfd) != 0) { return -1; } @@ -967,33 +1121,24 @@ static int mainloop(int epollfd) while (nDPIsrvd_main_thread_shutdown == 0) { - int nready = epoll_wait(epollfd, events, events_size, -1); + int nready = epoll_wait(epollfd, events, events_size, 1000); for (int i = 0; i < nready; i++) { if (events[i].events & EPOLLERR || events[i].events & EPOLLHUP) { - if (events[i].data.fd != json_sockfd && events[i].data.fd != serv_sockfd) + if (events[i].data.fd != collector_un_sockfd && events[i].data.fd != distributor_un_sockfd && + events[i].data.fd != distributor_in_sockfd) { struct remote_desc * const current = (struct remote_desc *)events[i].data.ptr; switch (current->sock_type) { - case JSON_SOCK: + case COLLECTOR_UN: logger(1, "Collector disconnected: %d", current->fd); break; - case SERV_SOCK: - if (current->event_serv.peer_addr[0] == '\0') - { - logger(1, "%s", "Distributor disconnected"); - } - else - { - logger(1, - "Distributor disconnected: %.*s:%u", - (int)sizeof(current->event_serv.peer_addr), - current->event_serv.peer_addr, - current->event_serv.peer.sin_port); - } + case DISTRIBUTOR_UN: + case DISTRIBUTOR_IN: + logger_distributor(current, "Distributor connection", "closed"); break; } disconnect_client(epollfd, current); @@ -1005,7 +1150,8 @@ static int mainloop(int epollfd) continue; } - if (events[i].data.fd == json_sockfd || events[i].data.fd == serv_sockfd) + if (events[i].data.fd == collector_un_sockfd || events[i].data.fd == distributor_un_sockfd || + events[i].data.fd == distributor_in_sockfd) { /* New connection to collector / distributor. */ if (new_connection(epollfd, events[i].data.fd) != 0) @@ -1036,10 +1182,10 @@ static int mainloop(int epollfd) } else { - /* Incoming data / Outoing data ready to send. */ + /* Incoming data / Outoing data ready to receive / send. */ if (handle_data_event(epollfd, &events[i]) != 0) { - continue; + /* do nothing */ } } } @@ -1066,18 +1212,27 @@ static int setup_event_queue(void) return -1; } - if (add_in_event(epollfd, json_sockfd, NULL) != 0) + if (add_in_event_fd(epollfd, collector_un_sockfd) != 0) { - logger(1, "Error adding JSON fd to epoll: %s", strerror(errno)); + logger(1, "Error adding collector UNIX socket fd to epoll: %s", strerror(errno)); return -1; } - if (add_in_event(epollfd, serv_sockfd, NULL) != 0) + if (add_in_event_fd(epollfd, distributor_un_sockfd) != 0) { - logger(1, "Error adding SERV fd to epoll: %s", strerror(errno)); + logger(1, "Error adding distributor UNIX socket fd to epoll: %s", strerror(errno)); return -1; } + if (distributor_in_sockfd >= 0) + { + if (add_in_event_fd(epollfd, distributor_in_sockfd) != 0) + { + logger(1, "Error adding distributor TCP/IP socket fd to epoll: %s", strerror(errno)); + return -1; + } + } + return epollfd; } @@ -1134,12 +1289,21 @@ int main(int argc, char ** argv) retval = 1; } - if (access(nDPIsrvd_options.json_sockpath, F_OK) == 0) + if (access(nDPIsrvd_options.collector_un_sockpath, F_OK) == 0) { logger_early(1, - "UNIX socket %s exists; nDPIsrvd already running? " + "UNIX socket `%s' exists; nDPIsrvd already running? " "Please remove the socket manually or change socket path.", - nDPIsrvd_options.json_sockpath); + nDPIsrvd_options.collector_un_sockpath); + return 1; + } + + if (access(nDPIsrvd_options.distributor_un_sockpath, F_OK) == 0) + { + logger_early(1, + "UNIX socket `%s' exists; nDPIsrvd already running? " + "Please remove the socket manually or change socket path.", + nDPIsrvd_options.distributor_un_sockpath); return 1; } @@ -1155,17 +1319,27 @@ int main(int argc, char ** argv) goto error; } - if (create_listen_sockets() != 0) + switch (create_listen_sockets()) { - goto error; + case 0: + break; + case 1: + goto error; + case 2: + unlink(nDPIsrvd_options.collector_un_sockpath); + goto error; + case 3: + goto error_unlink_sockets; + default: + goto error; } - logger(0, "collector listen on %s", nDPIsrvd_options.json_sockpath); - logger(0, "distributor listen on %s", nDPIsrvd_options.serv_optarg); - switch (serv_address.raw.sa_family) + logger(0, "collector UNIX socket listen on `%s'", nDPIsrvd_options.collector_un_sockpath); + logger(0, "distributor UNIX listen on `%s'", nDPIsrvd_options.distributor_un_sockpath); + switch (distributor_in_address.raw.sa_family) { default: - goto error; + goto error_unlink_sockets; case AF_INET: case AF_INET6: logger(1, @@ -1173,16 +1347,16 @@ int main(int argc, char ** argv) "everyone with access to the device/network. You've been warned!"); break; case AF_UNIX: + case 0xFFFF: break; } errno = 0; - if (nDPIsrvd_options.user != NULL && - change_user_group(nDPIsrvd_options.user, - nDPIsrvd_options.group, - nDPIsrvd_options.pidfile, - nDPIsrvd_options.json_sockpath, - (serv_address.raw.sa_family == AF_UNIX ? nDPIsrvd_options.serv_optarg : NULL)) != 0) + if (nDPIsrvd_options.user != NULL && change_user_group(nDPIsrvd_options.user, + nDPIsrvd_options.group, + nDPIsrvd_options.pidfile, + nDPIsrvd_options.collector_un_sockpath, + nDPIsrvd_options.distributor_un_sockpath) != 0) { if (errno != 0) { @@ -1199,11 +1373,10 @@ int main(int argc, char ** argv) nDPIsrvd_options.user, (nDPIsrvd_options.group != NULL ? nDPIsrvd_options.group : "-")); } - goto error; + goto error_unlink_sockets; } signal(SIGPIPE, SIG_IGN); - signal(SIGINT, SIG_IGN); signal(SIGTERM, SIG_IGN); signal(SIGQUIT, SIG_IGN); @@ -1211,22 +1384,24 @@ int main(int argc, char ** argv) epollfd = setup_event_queue(); if (epollfd < 0) { - goto error; + goto error_unlink_sockets; } retval = mainloop(epollfd); close_event_queue(epollfd); + +error_unlink_sockets: + unlink(nDPIsrvd_options.collector_un_sockpath); + unlink(nDPIsrvd_options.distributor_un_sockpath); error: - close(json_sockfd); - close(serv_sockfd); + close(collector_un_sockfd); + close(distributor_un_sockfd); + close(distributor_in_sockfd); daemonize_shutdown(nDPIsrvd_options.pidfile); logger(0, "Bye."); shutdown_logging(); - unlink(nDPIsrvd_options.json_sockpath); - unlink(nDPIsrvd_options.serv_optarg); - return retval; } #endif |