aboutsummaryrefslogtreecommitdiff
path: root/nDPIsrvd.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2022-03-10 13:51:21 +0100
committerToni Uhlig <matzeton@googlemail.com>2022-03-10 14:26:07 +0100
commit41757ecf1cbcbcd890c1ab7e08995aaffe031752 (patch)
tree0421cdcc7573aa40fc1e198f16f02913e5e7bcd0 /nDPIsrvd.c
parent6f1f9e65ea86bba7c944b183e7d413a14f71852d (diff)
Added nDPIsrvd TCP/IP support for distributors.
* nDPIsrvd: Improved distributor client disconnect detection * nDPIsrvd: Fixed invalid usage of epoll_add instead of epoll_mod * nPDIsrvd: Improved logging for distributor clients Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r--nDPIsrvd.c633
1 files changed, 404 insertions, 229 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c
index 791901f78..454e4a85f 100644
--- a/nDPIsrvd.c
+++ b/nDPIsrvd.c
@@ -20,8 +20,9 @@
enum sock_type
{
- JSON_SOCK,
- SERV_SOCK
+ COLLECTOR_UN,
+ DISTRIBUTOR_UN,
+ DISTRIBUTOR_IN,
};
struct remote_desc
@@ -34,16 +35,21 @@ struct remote_desc
{
struct
{
- int json_sockfd;
+ int collector_sockfd;
struct sockaddr_un peer;
unsigned long long int json_bytes;
- } event_json;
+ } event_collector_un;
struct
{
- int serv_sockfd;
+ int distributor_sockfd;
+ struct sockaddr_un peer;
+ } event_distributor_un; /* UNIX socket */
+ struct
+ {
+ int distributor_sockfd;
struct sockaddr_in peer;
char peer_addr[INET_ADDRSTRLEN];
- } event_serv;
+ } event_distributor_in; /* TCP/IP socket */
};
};
@@ -55,17 +61,19 @@ static struct
} remotes = {NULL, 0, 0};
static int nDPIsrvd_main_thread_shutdown = 0;
-static int json_sockfd;
-static int serv_sockfd;
-static struct nDPIsrvd_address serv_address = {
+static int collector_un_sockfd = -1;
+static int distributor_un_sockfd = -1;
+static int distributor_in_sockfd = -1;
+static struct nDPIsrvd_address distributor_in_address = {
.raw.sa_family = 0xFFFF,
};
static struct
{
char * pidfile;
- char * json_sockpath;
- char * serv_optarg;
+ char * collector_un_sockpath;
+ char * distributor_un_sockpath;
+ char * distributor_in_address;
char * user;
char * group;
nDPIsrvd_ull cache_array_length;
@@ -74,10 +82,10 @@ static struct
static int fcntl_add_flags(int fd, int flags);
static int fcntl_del_flags(int fd, int flags);
+static int add_in_event_fd(int epollfd, int fd);
+static int add_in_event(int epollfd, struct remote_desc * const remote);
+static int del_out_event(int epollfd, struct remote_desc * const remote);
static void disconnect_client(int epollfd, struct remote_desc * const current);
-static int add_in_event(int epollfd, int fd, void * ptr);
-static int add_out_event(int epollfd, int fd, void * ptr);
-static int del_event(int epollfd, int fd);
static int drain_cache_blocking(struct remote_desc * const remote);
static void nDPIsrvd_buffer_array_copy(void * dst, const void * src)
@@ -154,6 +162,39 @@ static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf,
return 0;
}
+static void logger_distributor(struct remote_desc * const remote,
+ char const * const prefix,
+ char const * const format,
+ ...)
+{
+ char logbuf[512];
+ va_list ap;
+
+ va_start(ap, format);
+ vsnprintf(logbuf, sizeof(logbuf), format, ap);
+
+ if (remote->sock_type == DISTRIBUTOR_UN)
+ {
+ logger(1, "%s %s", prefix, logbuf);
+ }
+ else if (remote->sock_type == DISTRIBUTOR_IN)
+ {
+ logger(1,
+ "%s %.*s:%u %s",
+ prefix,
+ (int)sizeof(remote->event_distributor_in.peer_addr),
+ remote->event_distributor_in.peer_addr,
+ ntohs(remote->event_distributor_in.peer.sin_port),
+ logbuf);
+ }
+ else
+ {
+ logger(1, "BUG: Distributor logging interface called with an collector/invalid remote");
+ }
+
+ va_end(ap);
+}
+
static int drain_main_buffer(struct remote_desc * const remote)
{
if (remote->buf.used == 0)
@@ -169,46 +210,19 @@ static int drain_main_buffer(struct remote_desc * const remote)
}
if (bytes_written < 0 || errno != 0)
{
- if (remote->event_serv.peer_addr[0] == '\0')
- {
- logger(1, "Distributor connection closed, send failed: %s", strerror(errno));
- }
- else
- {
- logger(1,
- "Distributor connection to %.*s:%u closed, send failed: %s",
- (int)sizeof(remote->event_serv.peer_addr),
- remote->event_serv.peer_addr,
- ntohs(remote->event_serv.peer.sin_port),
- strerror(errno));
- }
+ logger_distributor(remote, "Distributor connection", "%d closed, send failed: %s", remote->fd, strerror(errno));
return -1;
}
if (bytes_written == 0)
{
- if (remote->event_serv.peer_addr[0] == '\0')
- {
- logger(0, "%s", "Distributor connection closed during write");
- }
- else
- {
- logger(0,
- "Distributor connection to %.*s:%u closed during write",
- (int)sizeof(remote->event_serv.peer_addr),
- remote->event_serv.peer_addr,
- ntohs(remote->event_serv.peer.sin_port));
- }
+ logger_distributor(remote, "Distributor connection", "%d closed", remote->fd);
return -1;
}
if ((size_t)bytes_written < remote->buf.used)
{
#if 0
- logger(0, "Distributor wrote less than expected to %.*s:%u: %zd < %zu",
- (int)sizeof(remote->event_serv.peer_addr),
- remote->event_serv.peer_addr,
- ntohs(remote->event_serv.peer.sin_port),
- bytes_written,
- remote->buf.used);
+ logger_distributor(
+ remote, "Distributor", "wrote less than expected: %zd < %zu", bytes_written, remote->buf.used);
#endif
memmove(remote->buf.ptr.raw, remote->buf.ptr.raw + bytes_written, remote->buf.used - bytes_written);
}
@@ -278,7 +292,7 @@ static int drain_cache_blocking(struct remote_desc * const remote)
static int handle_outgoing_data(int epollfd, struct remote_desc * const remote)
{
- if (remote->sock_type != SERV_SOCK)
+ if (remote->sock_type != DISTRIBUTOR_UN && remote->sock_type != DISTRIBUTOR_IN)
{
return -1;
}
@@ -289,7 +303,7 @@ static int handle_outgoing_data(int epollfd, struct remote_desc * const remote)
}
if (utarray_len(remote->buf_cache) == 0)
{
- del_event(epollfd, remote->fd);
+ return del_out_event(epollfd, remote);
}
return 0;
@@ -321,73 +335,150 @@ static int fcntl_del_flags(int fd, int flags)
static int create_listen_sockets(void)
{
- json_sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
- serv_sockfd = socket(serv_address.raw.sa_family, SOCK_STREAM, 0);
- if (json_sockfd < 0 || serv_sockfd < 0)
+ collector_un_sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
+ distributor_un_sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (collector_un_sockfd < 0 || distributor_un_sockfd < 0)
{
- logger(1, "Error opening socket: %s", strerror(errno));
+ logger(1, "Error creating UNIX socket: %s", strerror(errno));
return 1;
}
- int opt = 1;
- if (setsockopt(json_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0 ||
- setsockopt(serv_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0)
+ if (nDPIsrvd_options.distributor_in_address != NULL)
{
- logger(1, "setsockopt with SO_REUSEADDR failed: %s", strerror(errno));
+ distributor_in_sockfd = socket(distributor_in_address.raw.sa_family, SOCK_STREAM, 0);
+ if (distributor_in_sockfd < 0)
+ {
+ logger(1, "Error creating TCP/IP socket: %s", strerror(errno));
+ return 1;
+ }
+ int opt = 1;
+ if (setsockopt(distributor_in_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0)
+ {
+ logger(1, "TCP/IP socket setsockopt(SO_REUSEADDR) failed: %s", strerror(errno));
+ }
}
- struct sockaddr_un json_addr;
- json_addr.sun_family = AF_UNIX;
- int written = snprintf(json_addr.sun_path, sizeof(json_addr.sun_path), "%s", nDPIsrvd_options.json_sockpath);
- if (written < 0)
{
- logger(1, "snprintf failed: %s", strerror(errno));
- return 1;
+ int opt = 1;
+ if (setsockopt(collector_un_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0 ||
+ setsockopt(distributor_un_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0)
+ {
+ logger(1, "UNIX socket setsockopt(SO_REUSEADDR) failed: %s", strerror(errno));
+ }
}
- else if (written == sizeof(json_addr.sun_path))
+
{
- logger(1,
- "JSON socket path too long, current/max: %zu/%zu",
- strlen(nDPIsrvd_options.json_sockpath),
- sizeof(json_addr.sun_path) - 1);
- return 1;
+ struct sockaddr_un collector_addr;
+ collector_addr.sun_family = AF_UNIX;
+ int written = snprintf(collector_addr.sun_path,
+ sizeof(collector_addr.sun_path),
+ "%s",
+ nDPIsrvd_options.collector_un_sockpath);
+ if (written < 0)
+ {
+ logger(1, "snprintf failed: %s", strerror(errno));
+ return 1;
+ }
+ else if (written == sizeof(collector_addr.sun_path))
+ {
+ logger(1,
+ "Collector UNIX socket path too long, current/max: %zu/%zu",
+ strlen(nDPIsrvd_options.collector_un_sockpath),
+ sizeof(collector_addr.sun_path) - 1);
+ return 1;
+ }
+
+ if (bind(collector_un_sockfd, (struct sockaddr *)&collector_addr, sizeof(collector_addr)) < 0)
+ {
+ logger(1,
+ "Error binding Collector UNIX socket to `%s': %s",
+ nDPIsrvd_options.collector_un_sockpath,
+ strerror(errno));
+ return 1;
+ }
}
- if (bind(json_sockfd, (struct sockaddr *)&json_addr, sizeof(json_addr)) < 0)
{
- unlink(nDPIsrvd_options.json_sockpath);
- logger(1,
- "Error on binding UNIX socket (collector) to %s: %s",
- nDPIsrvd_options.json_sockpath,
- strerror(errno));
- return 1;
+ struct sockaddr_un distributor_addr;
+ distributor_addr.sun_family = AF_UNIX;
+ int written = snprintf(distributor_addr.sun_path,
+ sizeof(distributor_addr.sun_path),
+ "%s",
+ nDPIsrvd_options.distributor_un_sockpath);
+ if (written < 0)
+ {
+ logger(1, "snprintf failed: %s", strerror(errno));
+ return 2;
+ }
+ else if (written == sizeof(distributor_addr.sun_path))
+ {
+ logger(1,
+ "Distributor UNIX socket path too long, current/max: %zu/%zu",
+ strlen(nDPIsrvd_options.distributor_un_sockpath),
+ sizeof(distributor_addr.sun_path) - 1);
+ return 2;
+ }
+
+ if (bind(distributor_un_sockfd, (struct sockaddr *)&distributor_addr, sizeof(distributor_addr)) < 0)
+ {
+ logger(1,
+ "Error binding Distributor socket to `%s': %s",
+ nDPIsrvd_options.distributor_un_sockpath,
+ strerror(errno));
+ return 2;
+ }
}
- if (bind(serv_sockfd, &serv_address.raw, serv_address.size) < 0)
+ if (nDPIsrvd_options.distributor_in_address != NULL)
{
- logger(1, "Error on binding socket (distributor) to %s: %s", nDPIsrvd_options.serv_optarg, strerror(errno));
- unlink(nDPIsrvd_options.json_sockpath);
- return 1;
+ if (bind(distributor_in_sockfd, &distributor_in_address.raw, distributor_in_address.size) < 0)
+ {
+ logger(1,
+ "Error binding Distributor TCP/IP socket to %s: %s",
+ nDPIsrvd_options.distributor_in_address,
+ strerror(errno));
+ return 3;
+ }
+ if (listen(distributor_in_sockfd, 16) < 0)
+ {
+ logger(1,
+ "Error listening Distributor TCP/IP socket to %s: %s",
+ nDPIsrvd_options.distributor_in_address,
+ strerror(errno));
+ return 3;
+ }
+ if (fcntl_add_flags(distributor_in_sockfd, O_NONBLOCK) != 0)
+ {
+ logger(1,
+ "Error setting Distributor TCP/IP socket %s to non-blocking mode: %s",
+ nDPIsrvd_options.distributor_in_address,
+ strerror(errno));
+ return 3;
+ }
}
- if (listen(json_sockfd, 16) < 0 || listen(serv_sockfd, 16) < 0)
+ if (listen(collector_un_sockfd, 16) < 0 || listen(distributor_un_sockfd, 16) < 0)
{
- unlink(nDPIsrvd_options.json_sockpath);
- logger(1, "Error on listen: %s", strerror(errno));
- return 1;
+ logger(1, "Error listening UNIX socket: %s", strerror(errno));
+ return 3;
}
- if (fcntl_add_flags(json_sockfd, O_NONBLOCK) != 0)
+ if (fcntl_add_flags(collector_un_sockfd, O_NONBLOCK) != 0)
{
- unlink(nDPIsrvd_options.json_sockpath);
- logger(1, "Error setting fd flags for the collector socket: %s", strerror(errno));
- return 1;
+ logger(1,
+ "Error setting Collector UNIX socket `%s' to non-blocking mode: %s",
+ nDPIsrvd_options.collector_un_sockpath,
+ strerror(errno));
+ return 3;
}
- if (fcntl_add_flags(serv_sockfd, O_NONBLOCK) != 0)
+ if (fcntl_add_flags(distributor_un_sockfd, O_NONBLOCK) != 0)
{
- logger(1, "Error setting fd flags for the distributor socket: %s", strerror(errno));
- return 1;
+ logger(1,
+ "Error setting Distributor UNIX socket `%s' to non-blocking mode: %s",
+ nDPIsrvd_options.distributor_un_sockpath,
+ strerror(errno));
+ return 3;
}
return 0;
@@ -405,7 +496,10 @@ static struct remote_desc * get_unused_remote_descriptor(enum sock_type type, in
if (remotes.desc[i].fd == -1)
{
remotes.desc_used++;
- utarray_new(remotes.desc[i].buf_cache, &nDPIsrvd_buffer_array_icd);
+ if (remotes.desc[i].buf_cache == NULL)
+ {
+ utarray_new(remotes.desc[i].buf_cache, &nDPIsrvd_buffer_array_icd);
+ }
if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, max_buffer_size) != 0 || remotes.desc[i].buf_cache == NULL)
{
return NULL;
@@ -449,14 +543,34 @@ static int add_event(int epollfd, int events, int fd, void * ptr)
return epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
}
-static int add_in_event(int epollfd, int fd, void * ptr)
+static int add_in_event_fd(int epollfd, int fd)
+{
+ return add_event(epollfd, EPOLLIN, fd, NULL);
+}
+
+static int add_in_event(int epollfd, struct remote_desc * const remote)
+{
+ return add_event(epollfd, EPOLLIN, remote->fd, remote);
+}
+
+static int mod_event(int epollfd, int events, int fd, void * ptr)
+{
+ struct epoll_event event = {};
+
+ event.data.ptr = ptr;
+ event.events = events;
+
+ return epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
+}
+
+static int add_out_event(int epollfd, struct remote_desc * const remote)
{
- return add_event(epollfd, EPOLLIN, fd, ptr);
+ return mod_event(epollfd, EPOLLIN | EPOLLOUT, remote->fd, remote);
}
-static int add_out_event(int epollfd, int fd, void * ptr)
+static int del_out_event(int epollfd, struct remote_desc * const remote)
{
- return add_event(epollfd, EPOLLOUT, fd, ptr);
+ return mod_event(epollfd, EPOLLIN, remote->fd, remote);
}
static int del_event(int epollfd, int fd)
@@ -487,7 +601,7 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
{
int opt;
- while ((opt = getopt(argc, argv, "lL:c:dp:s:u:g:C:Dvh")) != -1)
+ while ((opt = getopt(argc, argv, "lL:c:dp:s:S:u:g:C:Dvh")) != -1)
{
switch (opt)
{
@@ -501,8 +615,8 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
}
break;
case 'c':
- free(nDPIsrvd_options.json_sockpath);
- nDPIsrvd_options.json_sockpath = strdup(optarg);
+ free(nDPIsrvd_options.collector_un_sockpath);
+ nDPIsrvd_options.collector_un_sockpath = strdup(optarg);
break;
case 'd':
daemonize_enable();
@@ -512,8 +626,12 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
nDPIsrvd_options.pidfile = strdup(optarg);
break;
case 's':
- free(nDPIsrvd_options.serv_optarg);
- nDPIsrvd_options.serv_optarg = strdup(optarg);
+ free(nDPIsrvd_options.distributor_un_sockpath);
+ nDPIsrvd_options.distributor_un_sockpath = strdup(optarg);
+ break;
+ case 'S':
+ free(nDPIsrvd_options.distributor_in_address);
+ nDPIsrvd_options.distributor_in_address = strdup(optarg);
break;
case 'u':
free(nDPIsrvd_options.user);
@@ -541,7 +659,8 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
fprintf(stderr, "%s\n", get_nDPId_version());
fprintf(stderr,
"Usage: %s [-l] [-L logfile] [-c path-to-unix-sock] [-d] [-p pidfile]\n"
- "\t[-s path-to-unix-socket|distributor-host:port] [-u user] [-g group]\n"
+ "\t[-s path-to-distributor-unix-socket] [-S distributor-host:port]\n"
+ "\t[-u user] [-g group]\n"
"\t[-C max-buffered-collector-json-lines] [-D]\n"
"\t[-v] [-h]\n",
argv[0]);
@@ -558,33 +677,45 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
return 1;
}
- if (nDPIsrvd_options.json_sockpath == NULL)
+ if (nDPIsrvd_options.collector_un_sockpath == NULL)
{
- nDPIsrvd_options.json_sockpath = strdup(COLLECTOR_UNIX_SOCKET);
+ nDPIsrvd_options.collector_un_sockpath = strdup(COLLECTOR_UNIX_SOCKET);
}
- if (is_path_absolute("JSON socket", nDPIsrvd_options.json_sockpath) != 0)
+ if (is_path_absolute("Collector UNIX socket", nDPIsrvd_options.collector_un_sockpath) != 0)
{
return 1;
}
- if (nDPIsrvd_options.serv_optarg == NULL)
+ if (nDPIsrvd_options.distributor_un_sockpath == NULL)
{
- nDPIsrvd_options.serv_optarg = strdup(DISTRIBUTOR_UNIX_SOCKET);
+ nDPIsrvd_options.distributor_un_sockpath = strdup(DISTRIBUTOR_UNIX_SOCKET);
}
-
- if (nDPIsrvd_setup_address(&serv_address, nDPIsrvd_options.serv_optarg) != 0)
+ if (is_path_absolute("Distributor UNIX socket", nDPIsrvd_options.distributor_un_sockpath) != 0)
{
- logger_early(1, "%s: Could not parse address `%s'\n", argv[0], nDPIsrvd_options.serv_optarg);
return 1;
}
- if (serv_address.raw.sa_family == AF_UNIX && is_path_absolute("SERV socket", nDPIsrvd_options.serv_optarg) != 0)
+
+ if (nDPIsrvd_options.distributor_in_address != NULL)
{
- return 1;
+ if (nDPIsrvd_setup_address(&distributor_in_address, nDPIsrvd_options.distributor_in_address) != 0)
+ {
+ logger_early(1, "%s: Could not parse address %s", argv[0], nDPIsrvd_options.distributor_in_address);
+ return 1;
+ }
+ if (distributor_in_address.raw.sa_family == AF_UNIX)
+ {
+ logger_early(1,
+ "%s: You've requested to setup another UNIX socket `%s', but there is already one at `%s'",
+ argv[0],
+ nDPIsrvd_options.distributor_in_address,
+ nDPIsrvd_options.distributor_un_sockpath);
+ return 1;
+ }
}
if (optind < argc)
{
- logger_early(1, "%s: Unexpected argument after options\n", argv[0]);
+ logger_early(1, "%s: Unexpected argument after options", argv[0]);
return 1;
}
@@ -617,24 +748,31 @@ static int new_connection(int epollfd, int eventfd)
{
union
{
- struct sockaddr_un event_json;
- struct sockaddr_un event_serv;
+ struct sockaddr_un saddr_collector_un;
+ struct sockaddr_un saddr_distributor_un;
+ struct sockaddr_in saddr_distributor_in;
} sockaddr;
socklen_t peer_addr_len;
enum sock_type stype;
int server_fd;
- if (eventfd == json_sockfd)
+ if (eventfd == collector_un_sockfd)
{
- peer_addr_len = sizeof(sockaddr.event_json);
- stype = JSON_SOCK;
- server_fd = json_sockfd;
+ peer_addr_len = sizeof(sockaddr.saddr_collector_un);
+ stype = COLLECTOR_UN;
+ server_fd = collector_un_sockfd;
}
- else if (eventfd == serv_sockfd)
+ else if (eventfd == distributor_un_sockfd)
{
- peer_addr_len = sizeof(sockaddr.event_serv);
- stype = SERV_SOCK;
- server_fd = serv_sockfd;
+ peer_addr_len = sizeof(sockaddr.saddr_distributor_un);
+ stype = DISTRIBUTOR_UN;
+ server_fd = distributor_un_sockfd;
+ }
+ else if (eventfd == distributor_in_sockfd)
+ {
+ peer_addr_len = sizeof(sockaddr.saddr_distributor_in);
+ stype = DISTRIBUTOR_IN;
+ server_fd = distributor_in_sockfd;
}
else
{
@@ -647,63 +785,65 @@ static int new_connection(int epollfd, int eventfd)
return 1;
}
- char const * sock_type = NULL;
- int sockopt = NETWORK_BUFFER_MAX_SIZE;
+ int sockopt;
switch (current->sock_type)
{
- case JSON_SOCK:
- sock_type = "collector";
- current->event_json.json_bytes = 0;
- logger(0, "New collector connection");
+ case COLLECTOR_UN:
+ current->event_collector_un.peer = sockaddr.saddr_collector_un;
+ current->event_collector_un.json_bytes = 0;
+ logger(1, "New collector connection");
+ sockopt = NETWORK_BUFFER_MAX_SIZE;
if (setsockopt(current->fd, SOL_SOCKET, SO_RCVBUF, &sockopt, sizeof(sockopt)) < 0)
{
logger(1, "Error setting socket option SO_RCVBUF: %s", strerror(errno));
return 1;
}
break;
- case SERV_SOCK:
- sock_type = "distributor";
+ case DISTRIBUTOR_UN:
+ case DISTRIBUTOR_IN:
+ if (current->sock_type == DISTRIBUTOR_UN)
+ {
+ current->event_distributor_un.peer = sockaddr.saddr_distributor_un;
+ }
+ else
+ {
+ current->event_distributor_in.peer = sockaddr.saddr_distributor_in;
+ sockopt = 1;
+ if (setsockopt(current->fd, SOL_SOCKET, SO_RCVBUF, &sockopt, sizeof(sockopt)) < 0)
+ {
+ logger(1, "Error setting socket option SO_RCVBUF: %s", strerror(errno));
+ return 1;
+ }
+ }
+
+ sockopt = NETWORK_BUFFER_MAX_SIZE;
if (setsockopt(current->fd, SOL_SOCKET, SO_SNDBUF, &sockopt, sizeof(sockopt)) < 0)
{
logger(1, "Error setting socket option SO_SNDBUF: %s", strerror(errno));
return 1;
}
- if (inet_ntop(current->event_serv.peer.sin_family,
- &current->event_serv.peer.sin_addr,
- &current->event_serv.peer_addr[0],
- sizeof(current->event_serv.peer_addr)) == NULL)
+ if (inet_ntop(current->event_distributor_in.peer.sin_family,
+ &current->event_distributor_in.peer.sin_addr,
+ &current->event_distributor_in.peer_addr[0],
+ sizeof(current->event_distributor_in.peer_addr)) == NULL)
{
- if (errno == EAFNOSUPPORT)
- {
- logger(0, "%s", "New distributor connection.");
- }
- else
+ if (errno != EAFNOSUPPORT)
{
logger(1, "Error converting an internet address: %s", strerror(errno));
}
- current->event_serv.peer_addr[0] = '\0';
- }
- else
- {
- logger(0,
- "New distributor connection from %.*s:%u",
- (int)sizeof(current->event_serv.peer_addr),
- current->event_serv.peer_addr,
- ntohs(current->event_serv.peer.sin_port));
+ current->event_distributor_in.peer_addr[0] = '\0';
}
+ logger_distributor(current, "New distributor connection from", "%d", current->fd);
+
{
struct timeval send_timeout = {1, 0};
if (setsockopt(current->fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&send_timeout, sizeof(send_timeout)) != 0)
{
logger(1, "Error setting socket option send timeout: %s", strerror(errno));
}
- if (setsockopt(current->fd, SOL_SOCKET, SO_SNDBUF, &sockopt, sizeof(sockopt)) < 0)
- {
- logger(1, "Error setting socket option SO_SNDBUF: %s", strerror(errno));
- }
}
break;
}
@@ -711,25 +851,24 @@ static int new_connection(int epollfd, int eventfd)
/* nonblocking fd is mandatory */
if (fcntl_add_flags(current->fd, O_NONBLOCK) != 0)
{
- logger(1, "Error setting %s fd flags: %s", sock_type, strerror(errno));
+ logger(1, "Error setting fd flags: %s", strerror(errno));
disconnect_client(epollfd, current);
return 1;
}
/* shutdown writing end for collector clients */
- if (current->sock_type == JSON_SOCK)
+ if (current->sock_type == COLLECTOR_UN)
{
shutdown(current->fd, SHUT_WR); // collector
- /* setup epoll event */
- if (add_in_event(epollfd, current->fd, current) != 0)
- {
- disconnect_client(epollfd, current);
- return 1;
- }
+ /* shutdown reading end for distributor clients does not work due to epoll usage */
}
- else
+
+ /* setup epoll event */
+ if (add_in_event(epollfd, current) != 0)
{
- shutdown(current->fd, SHUT_RD); // distributor
+ logger(1, "Error adding input event to %d: %s", current->fd, strerror(errno));
+ disconnect_client(epollfd, current);
+ return 1;
}
return 0;
@@ -747,8 +886,8 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
}
errno = 0;
- current->event_json.json_bytes = strtoull((char *)current->buf.ptr.text, &json_str_start, 10);
- current->event_json.json_bytes += json_str_start - current->buf.ptr.text;
+ current->event_collector_un.json_bytes = strtoull((char *)current->buf.ptr.text, &json_str_start, 10);
+ current->event_collector_un.json_bytes += json_str_start - current->buf.ptr.text;
if (errno == ERANGE)
{
@@ -776,22 +915,22 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
(long int)(json_str_start - current->buf.ptr.text));
}
- if (current->event_json.json_bytes > current->buf.max)
+ if (current->event_collector_un.json_bytes > current->buf.max)
{
- logger(1, "BUG: JSON string too big: %llu > %zu", current->event_json.json_bytes, current->buf.max);
+ logger(1, "BUG: JSON string too big: %llu > %zu", current->event_collector_un.json_bytes, current->buf.max);
disconnect_client(epollfd, current);
return 1;
}
- if (current->event_json.json_bytes > current->buf.used)
+ if (current->event_collector_un.json_bytes > current->buf.used)
{
return 1;
}
- if (current->buf.ptr.text[current->event_json.json_bytes - 2] != '}' ||
- current->buf.ptr.text[current->event_json.json_bytes - 1] != '\n')
+ if (current->buf.ptr.text[current->event_collector_un.json_bytes - 2] != '}' ||
+ current->buf.ptr.text[current->event_collector_un.json_bytes - 1] != '\n')
{
- logger(1, "BUG: Invalid JSON string: %.*s", (int)current->event_json.json_bytes, current->buf.ptr.text);
+ logger(1, "BUG: Invalid JSON string: %.*s", (int)current->event_collector_un.json_bytes, current->buf.ptr.text);
disconnect_client(epollfd, current);
return 1;
}
@@ -801,8 +940,19 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
static int handle_incoming_data(int epollfd, struct remote_desc * const current)
{
- if (current->sock_type != JSON_SOCK)
+ if (current->sock_type != COLLECTOR_UN)
{
+ unsigned char garbage = 0;
+
+ if (read(current->fd, &garbage, sizeof(garbage)) == sizeof(garbage))
+ {
+ logger_distributor(current, "Received data from", "%d which is not allowed to send us some.", current->fd);
+ }
+ else
+ {
+ logger_distributor(current, "Distributor connection", "%d closed", current->fd);
+ }
+ disconnect_client(epollfd, current);
return 1;
}
@@ -818,6 +968,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
read(current->fd, current->buf.ptr.raw + current->buf.used, current->buf.max - current->buf.used);
if (bytes_read < 0 || errno != 0)
{
+ logger(1, "Could not read remote: %s", strerror(errno));
disconnect_client(epollfd, current);
return 1;
}
@@ -843,28 +994,31 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
{
continue;
}
- if (remotes.desc[i].sock_type != SERV_SOCK)
+ if (remotes.desc[i].sock_type != DISTRIBUTOR_UN && remotes.desc[i].sock_type != DISTRIBUTOR_IN)
{
continue;
}
- if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used ||
+
+ if (current->event_collector_un.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used ||
utarray_len(remotes.desc[i].buf_cache) > 0)
{
if (utarray_len(remotes.desc[i].buf_cache) == 0)
{
#if 0
- logger(0, "Buffer capacity threshold (%zu bytes) reached, caching JSON strings.", remotes.desc[i].buf.used);
+ logger_distributor(&remotes.desc[i],
+ "Distributor",
+ "buffer capacity threshold (%zu bytes) reached, caching JSON strings.",
+ remotes.desc[i].buf.used);
#endif
errno = 0;
- if (add_out_event(epollfd, remotes.desc[i].fd, &remotes.desc[i]) != 0 &&
- errno != EEXIST /* required for nDPId-test */)
+ if (add_out_event(epollfd, &remotes.desc[i]) != 0)
{
logger(1, "%s: %s", "Could not add event, disconnecting", strerror(errno));
disconnect_client(epollfd, &remotes.desc[i]);
continue;
}
}
- if (add_to_cache(&remotes.desc[i], current->buf.ptr.raw, current->event_json.json_bytes) != 0)
+ if (add_to_cache(&remotes.desc[i], current->buf.ptr.raw, current->event_collector_un.json_bytes) != 0)
{
disconnect_client(epollfd, &remotes.desc[i]);
continue;
@@ -874,8 +1028,8 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
{
memcpy(remotes.desc[i].buf.ptr.raw + remotes.desc[i].buf.used,
current->buf.ptr.raw,
- current->event_json.json_bytes);
- remotes.desc[i].buf.used += current->event_json.json_bytes;
+ current->event_collector_un.json_bytes);
+ remotes.desc[i].buf.used += current->event_collector_un.json_bytes;
}
if (drain_main_buffer(&remotes.desc[i]) != 0)
@@ -885,10 +1039,10 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
}
memmove(current->buf.ptr.raw,
- current->buf.ptr.raw + current->event_json.json_bytes,
- current->buf.used - current->event_json.json_bytes);
- current->buf.used -= current->event_json.json_bytes;
- current->event_json.json_bytes = 0;
+ current->buf.ptr.raw + current->event_collector_un.json_bytes,
+ current->buf.used - current->event_collector_un.json_bytes);
+ current->buf.used -= current->event_collector_un.json_bytes;
+ current->event_collector_un.json_bytes = 0;
}
return 0;
@@ -946,7 +1100,7 @@ static int setup_signalfd(int epollfd)
return -1;
}
- if (add_in_event(epollfd, sfd, NULL) != 0)
+ if (add_in_event_fd(epollfd, sfd) != 0)
{
return -1;
}
@@ -967,33 +1121,24 @@ static int mainloop(int epollfd)
while (nDPIsrvd_main_thread_shutdown == 0)
{
- int nready = epoll_wait(epollfd, events, events_size, -1);
+ int nready = epoll_wait(epollfd, events, events_size, 1000);
for (int i = 0; i < nready; i++)
{
if (events[i].events & EPOLLERR || events[i].events & EPOLLHUP)
{
- if (events[i].data.fd != json_sockfd && events[i].data.fd != serv_sockfd)
+ if (events[i].data.fd != collector_un_sockfd && events[i].data.fd != distributor_un_sockfd &&
+ events[i].data.fd != distributor_in_sockfd)
{
struct remote_desc * const current = (struct remote_desc *)events[i].data.ptr;
switch (current->sock_type)
{
- case JSON_SOCK:
+ case COLLECTOR_UN:
logger(1, "Collector disconnected: %d", current->fd);
break;
- case SERV_SOCK:
- if (current->event_serv.peer_addr[0] == '\0')
- {
- logger(1, "%s", "Distributor disconnected");
- }
- else
- {
- logger(1,
- "Distributor disconnected: %.*s:%u",
- (int)sizeof(current->event_serv.peer_addr),
- current->event_serv.peer_addr,
- current->event_serv.peer.sin_port);
- }
+ case DISTRIBUTOR_UN:
+ case DISTRIBUTOR_IN:
+ logger_distributor(current, "Distributor connection", "closed");
break;
}
disconnect_client(epollfd, current);
@@ -1005,7 +1150,8 @@ static int mainloop(int epollfd)
continue;
}
- if (events[i].data.fd == json_sockfd || events[i].data.fd == serv_sockfd)
+ if (events[i].data.fd == collector_un_sockfd || events[i].data.fd == distributor_un_sockfd ||
+ events[i].data.fd == distributor_in_sockfd)
{
/* New connection to collector / distributor. */
if (new_connection(epollfd, events[i].data.fd) != 0)
@@ -1036,10 +1182,10 @@ static int mainloop(int epollfd)
}
else
{
- /* Incoming data / Outoing data ready to send. */
+ /* Incoming data / Outoing data ready to receive / send. */
if (handle_data_event(epollfd, &events[i]) != 0)
{
- continue;
+ /* do nothing */
}
}
}
@@ -1066,18 +1212,27 @@ static int setup_event_queue(void)
return -1;
}
- if (add_in_event(epollfd, json_sockfd, NULL) != 0)
+ if (add_in_event_fd(epollfd, collector_un_sockfd) != 0)
{
- logger(1, "Error adding JSON fd to epoll: %s", strerror(errno));
+ logger(1, "Error adding collector UNIX socket fd to epoll: %s", strerror(errno));
return -1;
}
- if (add_in_event(epollfd, serv_sockfd, NULL) != 0)
+ if (add_in_event_fd(epollfd, distributor_un_sockfd) != 0)
{
- logger(1, "Error adding SERV fd to epoll: %s", strerror(errno));
+ logger(1, "Error adding distributor UNIX socket fd to epoll: %s", strerror(errno));
return -1;
}
+ if (distributor_in_sockfd >= 0)
+ {
+ if (add_in_event_fd(epollfd, distributor_in_sockfd) != 0)
+ {
+ logger(1, "Error adding distributor TCP/IP socket fd to epoll: %s", strerror(errno));
+ return -1;
+ }
+ }
+
return epollfd;
}
@@ -1134,12 +1289,21 @@ int main(int argc, char ** argv)
retval = 1;
}
- if (access(nDPIsrvd_options.json_sockpath, F_OK) == 0)
+ if (access(nDPIsrvd_options.collector_un_sockpath, F_OK) == 0)
{
logger_early(1,
- "UNIX socket %s exists; nDPIsrvd already running? "
+ "UNIX socket `%s' exists; nDPIsrvd already running? "
"Please remove the socket manually or change socket path.",
- nDPIsrvd_options.json_sockpath);
+ nDPIsrvd_options.collector_un_sockpath);
+ return 1;
+ }
+
+ if (access(nDPIsrvd_options.distributor_un_sockpath, F_OK) == 0)
+ {
+ logger_early(1,
+ "UNIX socket `%s' exists; nDPIsrvd already running? "
+ "Please remove the socket manually or change socket path.",
+ nDPIsrvd_options.distributor_un_sockpath);
return 1;
}
@@ -1155,17 +1319,27 @@ int main(int argc, char ** argv)
goto error;
}
- if (create_listen_sockets() != 0)
+ switch (create_listen_sockets())
{
- goto error;
+ case 0:
+ break;
+ case 1:
+ goto error;
+ case 2:
+ unlink(nDPIsrvd_options.collector_un_sockpath);
+ goto error;
+ case 3:
+ goto error_unlink_sockets;
+ default:
+ goto error;
}
- logger(0, "collector listen on %s", nDPIsrvd_options.json_sockpath);
- logger(0, "distributor listen on %s", nDPIsrvd_options.serv_optarg);
- switch (serv_address.raw.sa_family)
+ logger(0, "collector UNIX socket listen on `%s'", nDPIsrvd_options.collector_un_sockpath);
+ logger(0, "distributor UNIX listen on `%s'", nDPIsrvd_options.distributor_un_sockpath);
+ switch (distributor_in_address.raw.sa_family)
{
default:
- goto error;
+ goto error_unlink_sockets;
case AF_INET:
case AF_INET6:
logger(1,
@@ -1173,16 +1347,16 @@ int main(int argc, char ** argv)
"everyone with access to the device/network. You've been warned!");
break;
case AF_UNIX:
+ case 0xFFFF:
break;
}
errno = 0;
- if (nDPIsrvd_options.user != NULL &&
- change_user_group(nDPIsrvd_options.user,
- nDPIsrvd_options.group,
- nDPIsrvd_options.pidfile,
- nDPIsrvd_options.json_sockpath,
- (serv_address.raw.sa_family == AF_UNIX ? nDPIsrvd_options.serv_optarg : NULL)) != 0)
+ if (nDPIsrvd_options.user != NULL && change_user_group(nDPIsrvd_options.user,
+ nDPIsrvd_options.group,
+ nDPIsrvd_options.pidfile,
+ nDPIsrvd_options.collector_un_sockpath,
+ nDPIsrvd_options.distributor_un_sockpath) != 0)
{
if (errno != 0)
{
@@ -1199,11 +1373,10 @@ int main(int argc, char ** argv)
nDPIsrvd_options.user,
(nDPIsrvd_options.group != NULL ? nDPIsrvd_options.group : "-"));
}
- goto error;
+ goto error_unlink_sockets;
}
signal(SIGPIPE, SIG_IGN);
-
signal(SIGINT, SIG_IGN);
signal(SIGTERM, SIG_IGN);
signal(SIGQUIT, SIG_IGN);
@@ -1211,22 +1384,24 @@ int main(int argc, char ** argv)
epollfd = setup_event_queue();
if (epollfd < 0)
{
- goto error;
+ goto error_unlink_sockets;
}
retval = mainloop(epollfd);
close_event_queue(epollfd);
+
+error_unlink_sockets:
+ unlink(nDPIsrvd_options.collector_un_sockpath);
+ unlink(nDPIsrvd_options.distributor_un_sockpath);
error:
- close(json_sockfd);
- close(serv_sockfd);
+ close(collector_un_sockfd);
+ close(distributor_un_sockfd);
+ close(distributor_in_sockfd);
daemonize_shutdown(nDPIsrvd_options.pidfile);
logger(0, "Bye.");
shutdown_logging();
- unlink(nDPIsrvd_options.json_sockpath);
- unlink(nDPIsrvd_options.serv_optarg);
-
return retval;
}
#endif