diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2022-03-13 02:28:10 +0100 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2022-03-13 02:28:10 +0100 |
commit | ed1647b9446f84d81d41e8e28ccf063eff97b2f7 (patch) | |
tree | 7f22929aca611955ea129dc0afee839bb63872bf /nDPIsrvd.c | |
parent | dd35d9da3fd43f1091b8ec496ec25d72e54d8e22 (diff) |
Disconnect nDPIsrvd clients immediately instead waiting for a failed write().
* nDPIsrvd: Collector/Distributor logging improved
* nDPIsrvd: Command line option for max remote descriptors
* nDPId: Stop spamming nDPIsrvd Collector with the same events over and over again
* nDPId: Refactored some variable names and events
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r-- | nDPIsrvd.c | 266 |
1 files changed, 181 insertions, 85 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c index 454e4a85f..05d3287e5 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -3,6 +3,7 @@ #include <fcntl.h> #include <netdb.h> #include <netinet/tcp.h> +#include <pwd.h> #include <signal.h> #include <stdio.h> #include <stdlib.h> @@ -38,11 +39,14 @@ struct remote_desc int collector_sockfd; struct sockaddr_un peer; unsigned long long int json_bytes; + pid_t pid; } event_collector_un; struct { int distributor_sockfd; struct sockaddr_un peer; + pid_t pid; + char * user_name; } event_distributor_un; /* UNIX socket */ struct { @@ -56,8 +60,8 @@ struct remote_desc static struct { struct remote_desc * desc; - size_t desc_size; - size_t desc_used; + nDPIsrvd_ull desc_size; + nDPIsrvd_ull desc_used; } remotes = {NULL, 0, 0}; static int nDPIsrvd_main_thread_shutdown = 0; @@ -74,12 +78,19 @@ static struct char * collector_un_sockpath; char * distributor_un_sockpath; char * distributor_in_address; + nDPIsrvd_ull max_remote_descriptors; char * user; char * group; nDPIsrvd_ull cache_array_length; int cache_fallback_to_blocking; -} nDPIsrvd_options = {.cache_array_length = nDPIsrvd_CACHE_ARRAY_LENGTH, .cache_fallback_to_blocking = 1}; - +} nDPIsrvd_options = {.max_remote_descriptors = nDPIsrvd_MAX_REMOTE_DESCRIPTORS, + .cache_array_length = nDPIsrvd_CACHE_ARRAY_LENGTH, + .cache_fallback_to_blocking = 1}; + +static void logger_nDPIsrvd(struct remote_desc const * const remote, + char const * const prefix, + char const * const format, + ...); static int fcntl_add_flags(int fd, int flags); static int fcntl_del_flags(int fd, int flags); static int add_in_event_fd(int epollfd, int fd); @@ -139,17 +150,20 @@ static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf, { if (nDPIsrvd_options.cache_fallback_to_blocking == 0) { - logger(1, "Buffer cache limit (%u lines) reached, remote too slow.", utarray_len(remote->buf_cache)); + logger_nDPIsrvd(remote, + "Buffer cache limit for", + "for reached, remote too slow: %u lines", + utarray_len(remote->buf_cache)); return -1; } else { - logger(0, - "Buffer JSON string cache limit (%u lines) reached, falling back to blocking I/O.", - utarray_len(remote->buf_cache)); + logger_nDPIsrvd(remote, + "Buffer JSON string cache limit for", + "reached, falling back to blocking I/O: %u lines", + utarray_len(remote->buf_cache)); if (drain_cache_blocking(remote) != 0) { - logger(1, "Could not drain buffer cache in blocking I/O: %s", strerror(errno)); return -1; } } @@ -162,10 +176,10 @@ static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf, return 0; } -static void logger_distributor(struct remote_desc * const remote, - char const * const prefix, - char const * const format, - ...) +static void logger_nDPIsrvd(struct remote_desc const * const remote, + char const * const prefix, + char const * const format, + ...) { char logbuf[512]; va_list ap; @@ -173,23 +187,28 @@ static void logger_distributor(struct remote_desc * const remote, va_start(ap, format); vsnprintf(logbuf, sizeof(logbuf), format, ap); - if (remote->sock_type == DISTRIBUTOR_UN) - { - logger(1, "%s %s", prefix, logbuf); - } - else if (remote->sock_type == DISTRIBUTOR_IN) - { - logger(1, - "%s %.*s:%u %s", - prefix, - (int)sizeof(remote->event_distributor_in.peer_addr), - remote->event_distributor_in.peer_addr, - ntohs(remote->event_distributor_in.peer.sin_port), - logbuf); - } - else + switch (remote->sock_type) { - logger(1, "BUG: Distributor logging interface called with an collector/invalid remote"); + case DISTRIBUTOR_UN: + logger(1, + "%s PID %d (User: %s) %s", + prefix, + remote->event_distributor_un.pid, + remote->event_distributor_un.user_name, + logbuf); + break; + case DISTRIBUTOR_IN: + logger(1, + "%s %.*s:%u %s", + prefix, + (int)sizeof(remote->event_distributor_in.peer_addr), + remote->event_distributor_in.peer_addr, + ntohs(remote->event_distributor_in.peer.sin_port), + logbuf); + break; + case COLLECTOR_UN: + logger(1, "%s PID %d %s", prefix, remote->event_collector_un.pid, logbuf); + break; } va_end(ap); @@ -210,18 +229,18 @@ static int drain_main_buffer(struct remote_desc * const remote) } if (bytes_written < 0 || errno != 0) { - logger_distributor(remote, "Distributor connection", "%d closed, send failed: %s", remote->fd, strerror(errno)); + logger_nDPIsrvd(remote, "Distributor connection", "closed, send failed: %s", strerror(errno)); return -1; } if (bytes_written == 0) { - logger_distributor(remote, "Distributor connection", "%d closed", remote->fd); + logger_nDPIsrvd(remote, "Distributor connection", "closed"); return -1; } if ((size_t)bytes_written < remote->buf.used) { #if 0 - logger_distributor( + logger_nDPIsrvd( remote, "Distributor", "wrote less than expected: %zd < %zu", bytes_written, remote->buf.used); #endif memmove(remote->buf.ptr.raw, remote->buf.ptr.raw + bytes_written, remote->buf.used - bytes_written); @@ -274,16 +293,17 @@ static int drain_cache_blocking(struct remote_desc * const remote) if (fcntl_del_flags(remote->fd, O_NONBLOCK) != 0) { - logger(1, "Error setting distributor fd flags: %s", strerror(errno)); + logger_nDPIsrvd(remote, "Error setting distributor", "fd flags to blocking mode: %s", strerror(errno)); return -1; } if (drain_cache(remote) != 0) { + logger_nDPIsrvd(remote, "Could not drain buffer cache for", "in blocking I/O: %s", strerror(errno)); retval = -1; } if (fcntl_add_flags(remote->fd, O_NONBLOCK) != 0) { - logger(1, "Error setting distributor fd flags: %s", strerror(errno)); + logger_nDPIsrvd(remote, "Error setting distributor", "fd flags to non-blocking mode: %s", strerror(errno)); return -1; } @@ -298,6 +318,7 @@ static int handle_outgoing_data(int epollfd, struct remote_desc * const remote) } if (drain_cache(remote) != 0) { + logger_nDPIsrvd(remote, "Could not drain buffer cache for", ": %s", strerror(errno)); disconnect_client(epollfd, remote); return -1; } @@ -354,7 +375,7 @@ static int create_listen_sockets(void) int opt = 1; if (setsockopt(distributor_in_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { - logger(1, "TCP/IP socket setsockopt(SO_REUSEADDR) failed: %s", strerror(errno)); + logger(1, "Setting TCP/IP socket option SO_REUSEADDR failed: %s", strerror(errno)); } } @@ -363,7 +384,7 @@ static int create_listen_sockets(void) if (setsockopt(collector_un_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0 || setsockopt(distributor_un_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { - logger(1, "UNIX socket setsockopt(SO_REUSEADDR) failed: %s", strerror(errno)); + logger(1, "Setting UNIX socket option SO_REUSEADDR failed: %s", strerror(errno)); } } @@ -484,7 +505,7 @@ static int create_listen_sockets(void) return 0; } -static struct remote_desc * get_unused_remote_descriptor(enum sock_type type, int remote_fd, size_t max_buffer_size) +static struct remote_desc * get_remote_descriptor(enum sock_type type, int remote_fd, size_t max_buffer_size) { if (remotes.desc_used == remotes.desc_size) { @@ -513,7 +534,7 @@ static struct remote_desc * get_unused_remote_descriptor(enum sock_type type, in return NULL; } -static void free_remote_descriptor_data(void) +static void free_remotes(void) { for (size_t i = 0; i < remotes.desc_size; ++i) { @@ -585,7 +606,21 @@ static void disconnect_client(int epollfd, struct remote_desc * const current) del_event(epollfd, current->fd); if (close(current->fd) != 0) { - logger(1, "Error closing fd: %s", strerror(errno)); + switch (current->sock_type) + { + case COLLECTOR_UN: + logger_nDPIsrvd(current, "Error closing collector connection", ": %s", strerror(errno)); + break; + case DISTRIBUTOR_UN: + case DISTRIBUTOR_IN: + logger_nDPIsrvd(current, "Error closing distributor connection", ": %s", strerror(errno)); + break; + } + } + if (current->sock_type == DISTRIBUTOR_UN) + { + free(current->event_distributor_un.user_name); + current->event_distributor_un.user_name = NULL; } current->fd = -1; remotes.desc_used--; @@ -601,7 +636,7 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) { int opt; - while ((opt = getopt(argc, argv, "lL:c:dp:s:S:u:g:C:Dvh")) != -1) + while ((opt = getopt(argc, argv, "lL:c:dp:s:S:m:u:g:C:Dvh")) != -1) { switch (opt) { @@ -633,6 +668,13 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) free(nDPIsrvd_options.distributor_in_address); nDPIsrvd_options.distributor_in_address = strdup(optarg); break; + case 'm': + if (str_value_to_ull(optarg, &nDPIsrvd_options.max_remote_descriptors) != CONVERSION_OK) + { + fprintf(stderr, "%s: Argument for `-C' is not a number: %s\n", argv[0], optarg); + return 1; + } + break; case 'u': free(nDPIsrvd_options.user); nDPIsrvd_options.user = strdup(optarg); @@ -660,7 +702,7 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) fprintf(stderr, "Usage: %s [-l] [-L logfile] [-c path-to-unix-sock] [-d] [-p pidfile]\n" "\t[-s path-to-distributor-unix-socket] [-S distributor-host:port]\n" - "\t[-u user] [-g group]\n" + "\t[-m max-remote-descriptors] [-u user] [-g group]\n" "\t[-C max-buffered-collector-json-lines] [-D]\n" "\t[-v] [-h]\n", argv[0]); @@ -734,10 +776,10 @@ static struct remote_desc * accept_remote(int server_fd, return NULL; } - struct remote_desc * current = get_unused_remote_descriptor(socktype, client_fd, NETWORK_BUFFER_MAX_SIZE); + struct remote_desc * current = get_remote_descriptor(socktype, client_fd, NETWORK_BUFFER_MAX_SIZE); if (current == NULL) { - logger(1, "Max number of connections reached: %zu", remotes.desc_used); + logger(1, "Max number of connections reached: %llu", remotes.desc_used); return NULL; } @@ -791,7 +833,6 @@ static int new_connection(int epollfd, int eventfd) case COLLECTOR_UN: current->event_collector_un.peer = sockaddr.saddr_collector_un; current->event_collector_un.json_bytes = 0; - logger(1, "New collector connection"); sockopt = NETWORK_BUFFER_MAX_SIZE; if (setsockopt(current->fd, SOL_SOCKET, SO_RCVBUF, &sockopt, sizeof(sockopt)) < 0) @@ -799,12 +840,49 @@ static int new_connection(int epollfd, int eventfd) logger(1, "Error setting socket option SO_RCVBUF: %s", strerror(errno)); return 1; } + + struct ucred ucred = {}; + socklen_t ucred_len = sizeof(ucred); + if (getsockopt(current->fd, SOL_SOCKET, SO_PEERCRED, &ucred, &ucred_len) == -1) + { + logger(1, "Error getting credentials from UNIX socket: %s", strerror(errno)); + return 1; + } + + current->event_collector_un.pid = ucred.pid; + + logger_nDPIsrvd(current, "New collector connection from", ""); break; case DISTRIBUTOR_UN: case DISTRIBUTOR_IN: if (current->sock_type == DISTRIBUTOR_UN) { current->event_distributor_un.peer = sockaddr.saddr_distributor_un; + + struct ucred ucred = {}; + socklen_t ucred_len = sizeof(ucred); + if (getsockopt(current->fd, SOL_SOCKET, SO_PEERCRED, &ucred, &ucred_len) == -1) + { + logger(1, "Error getting credentials from UNIX socket: %s", strerror(errno)); + return 1; + } + + struct passwd pwnam = {}; + struct passwd * pwres = NULL; + ssize_t pwsiz = sysconf(_SC_GETPW_R_SIZE_MAX); + if (pwsiz == -1) + { + pwsiz = BUFSIZ; + } + char buf[pwsiz]; + if (getpwuid_r(ucred.uid, &pwnam, &buf[0], pwsiz, &pwres) != 0) + { + logger(1, "Could not get passwd entry for user id %u", ucred.uid); + return 1; + } + + current->event_distributor_un.pid = ucred.pid; + current->event_distributor_un.user_name = strdup(pwres->pw_name); } else { @@ -816,6 +894,15 @@ static int new_connection(int epollfd, int eventfd) logger(1, "Error setting socket option SO_RCVBUF: %s", strerror(errno)); return 1; } + + if (inet_ntop(current->event_distributor_in.peer.sin_family, + ¤t->event_distributor_in.peer.sin_addr, + ¤t->event_distributor_in.peer_addr[0], + sizeof(current->event_distributor_in.peer_addr)) == NULL) + { + logger(1, "Error converting an internet address: %s", strerror(errno)); + return 1; + } } sockopt = NETWORK_BUFFER_MAX_SIZE; @@ -825,19 +912,6 @@ static int new_connection(int epollfd, int eventfd) return 1; } - if (inet_ntop(current->event_distributor_in.peer.sin_family, - ¤t->event_distributor_in.peer.sin_addr, - ¤t->event_distributor_in.peer_addr[0], - sizeof(current->event_distributor_in.peer_addr)) == NULL) - { - if (errno != EAFNOSUPPORT) - { - logger(1, "Error converting an internet address: %s", strerror(errno)); - } - current->event_distributor_in.peer_addr[0] = '\0'; - } - logger_distributor(current, "New distributor connection from", "%d", current->fd); - { struct timeval send_timeout = {1, 0}; if (setsockopt(current->fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&send_timeout, sizeof(send_timeout)) != 0) @@ -845,13 +919,15 @@ static int new_connection(int epollfd, int eventfd) logger(1, "Error setting socket option send timeout: %s", strerror(errno)); } } + + logger_nDPIsrvd(current, "New distributor connection from", ""); break; } /* nonblocking fd is mandatory */ if (fcntl_add_flags(current->fd, O_NONBLOCK) != 0) { - logger(1, "Error setting fd flags: %s", strerror(errno)); + logger(1, "Error setting fd flags to non-blocking mode: %s", strerror(errno)); disconnect_client(epollfd, current); return 1; } @@ -880,7 +956,10 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur if (current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{') { - logger(1, "BUG: JSON invalid opening character: '%c'", current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]); + logger_nDPIsrvd(current, + "BUG: Collector connection", + "JSON invalid opening character: '%c'", + current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]); disconnect_client(epollfd, current); return 1; } @@ -891,33 +970,40 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur if (errno == ERANGE) { - logger(1, "BUG: Size of JSON exceeds limit"); + logger_nDPIsrvd(current, "BUG: Collector connection", "JSON string length exceeds numceric limits"); disconnect_client(epollfd, current); return 1; } if (json_str_start == current->buf.ptr.text) { - logger(1, - "BUG: Missing size before JSON string: \"%.*s\"", - NETWORK_BUFFER_LENGTH_DIGITS, - current->buf.ptr.text); + logger_nDPIsrvd(current, + "BUG: Collector connection", + "missing JSON string length in protocol preamble: \"%.*s\"", + NETWORK_BUFFER_LENGTH_DIGITS, + current->buf.ptr.text); disconnect_client(epollfd, current); return 1; } if (json_str_start - current->buf.ptr.text != NETWORK_BUFFER_LENGTH_DIGITS) { - logger(1, - "BUG: Invalid collector protocol data received. Expected protocol preamble of size %u bytes, got %ld " - "bytes", - NETWORK_BUFFER_LENGTH_DIGITS, - (long int)(json_str_start - current->buf.ptr.text)); + logger_nDPIsrvd(current, + "BUG: Collector connection", + "invalid collector protocol data received. Expected protocol preamble of size %u bytes, got " + "%ld " + "bytes", + NETWORK_BUFFER_LENGTH_DIGITS, + (long int)(json_str_start - current->buf.ptr.text)); } if (current->event_collector_un.json_bytes > current->buf.max) { - logger(1, "BUG: JSON string too big: %llu > %zu", current->event_collector_un.json_bytes, current->buf.max); + logger_nDPIsrvd(current, + "BUG: Collector connection", + "JSON string too big: %llu > %zu", + current->event_collector_un.json_bytes, + current->buf.max); disconnect_client(epollfd, current); return 1; } @@ -930,7 +1016,11 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur if (current->buf.ptr.text[current->event_collector_un.json_bytes - 2] != '}' || current->buf.ptr.text[current->event_collector_un.json_bytes - 1] != '\n') { - logger(1, "BUG: Invalid JSON string: %.*s", (int)current->event_collector_un.json_bytes, current->buf.ptr.text); + logger_nDPIsrvd(current, + "BUG: Collector connection", + "invalid JSON string: %.*s", + (int)current->event_collector_un.json_bytes, + current->buf.ptr.text); disconnect_client(epollfd, current); return 1; } @@ -946,11 +1036,11 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) if (read(current->fd, &garbage, sizeof(garbage)) == sizeof(garbage)) { - logger_distributor(current, "Received data from", "%d which is not allowed to send us some.", current->fd); + logger_nDPIsrvd(current, "Received data from", "who is not allowed to send us some."); } else { - logger_distributor(current, "Distributor connection", "%d closed", current->fd); + logger_nDPIsrvd(current, "Distributor connection", "closed"); } disconnect_client(epollfd, current); return 1; @@ -959,7 +1049,10 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) /* read JSON strings (or parts) from the UNIX socket (collecting) */ if (current->buf.used == current->buf.max) { - logger(1, "Collector read buffer full. No more read possible."); + logger_nDPIsrvd(current, + "Collector connection", + "read buffer (%zu bytes) full. No more read possible.", + current->buf.max); } else { @@ -968,13 +1061,13 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) read(current->fd, current->buf.ptr.raw + current->buf.used, current->buf.max - current->buf.used); if (bytes_read < 0 || errno != 0) { - logger(1, "Could not read remote: %s", strerror(errno)); + logger_nDPIsrvd(current, "Could not read remote", ": %s", strerror(errno)); disconnect_client(epollfd, current); return 1; } if (bytes_read == 0) { - logger(0, "Collector connection closed during read"); + logger_nDPIsrvd(0, "Collector connection", "closed during read"); disconnect_client(epollfd, current); return 1; } @@ -1005,7 +1098,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) if (utarray_len(remotes.desc[i].buf_cache) == 0) { #if 0 - logger_distributor(&remotes.desc[i], + logger_nDPIsrvd(&remotes.desc[i], "Distributor", "buffer capacity threshold (%zu bytes) reached, caching JSON strings.", remotes.desc[i].buf.used); @@ -1013,7 +1106,10 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) errno = 0; if (add_out_event(epollfd, &remotes.desc[i]) != 0) { - logger(1, "%s: %s", "Could not add event, disconnecting", strerror(errno)); + logger_nDPIsrvd(&remotes.desc[i], + "Could not add event to", + ", disconnecting: %s", + strerror(errno)); disconnect_client(epollfd, &remotes.desc[i]); continue; } @@ -1134,11 +1230,11 @@ static int mainloop(int epollfd) switch (current->sock_type) { case COLLECTOR_UN: - logger(1, "Collector disconnected: %d", current->fd); + logger_nDPIsrvd(current, "Collector disconnected", "closed"); break; case DISTRIBUTOR_UN: case DISTRIBUTOR_IN: - logger_distributor(current, "Distributor connection", "closed"); + logger_nDPIsrvd(current, "Distributor connection", "closed"); break; } disconnect_client(epollfd, current); @@ -1193,7 +1289,7 @@ static int mainloop(int epollfd) close(signalfd); - free_remote_descriptor_data(); + free_remotes(); return 0; } @@ -1245,10 +1341,10 @@ static void close_event_queue(int epollfd) close(epollfd); } -static int setup_remote_descriptors(size_t max_descriptors) +static int setup_remote_descriptors(nDPIsrvd_ull max_remote_descriptors) { remotes.desc_used = 0; - remotes.desc_size = max_descriptors; + remotes.desc_size = max_remote_descriptors; remotes.desc = (struct remote_desc *)nDPIsrvd_calloc(remotes.desc_size, sizeof(*remotes.desc)); if (remotes.desc == NULL) { @@ -1286,7 +1382,7 @@ int main(int argc, char ** argv) "%s", "Daemon mode `-d' and `-l' can not be used together, " "because stdout/stderr is beeing redirected to /dev/null"); - retval = 1; + return 1; } if (access(nDPIsrvd_options.collector_un_sockpath, F_OK) == 0) @@ -1314,7 +1410,7 @@ int main(int argc, char ** argv) goto error; } - if (setup_remote_descriptors(32) != 0) + if (setup_remote_descriptors(nDPIsrvd_options.max_remote_descriptors) != 0) { goto error; } |