summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--nDPIsrvd.c702
1 files changed, 371 insertions, 331 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c
index 030393bfd..082a5c53a 100644
--- a/nDPIsrvd.c
+++ b/nDPIsrvd.c
@@ -24,13 +24,15 @@ struct io_buffer
size_t max;
};
+enum sock_type
+{
+ JSON_SOCK,
+ SERV_SOCK
+};
+
struct remote_desc
{
- enum
- {
- JSON_SOCK,
- SERV_SOCK
- } type;
+ enum sock_type sock_type;
int fd;
struct io_buffer buf;
union {
@@ -264,8 +266,370 @@ static int parse_options(int argc, char ** argv)
return 0;
}
+static int new_connection(int epollfd, int eventfd)
+{
+ struct remote_desc * current = get_unused_remote_descriptor();
+
+ if (current == NULL)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Max number of connections reached: %zu", remotes.desc_used);
+ return 1;
+ }
+ current->sock_type = (eventfd == json_sockfd ? JSON_SOCK : SERV_SOCK);
+
+ int sockfd = (current->sock_type == JSON_SOCK ? json_sockfd : serv_sockfd);
+ socklen_t peer_addr_len =
+ (current->sock_type == JSON_SOCK ? sizeof(current->event_json.peer) : sizeof(current->event_serv.peer));
+
+ current->fd = accept(sockfd,
+ (current->sock_type == JSON_SOCK ? (struct sockaddr *)&current->event_json.peer
+ : (struct sockaddr *)&current->event_serv.peer),
+ &peer_addr_len);
+ if (current->fd < 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Accept failed: %s", strerror(errno));
+ disconnect_client(epollfd, current);
+ return 1;
+ }
+
+ switch (current->sock_type)
+ {
+ case JSON_SOCK:
+ current->event_json.json_bytes = 0;
+ syslog(LOG_DAEMON, "New collector connection");
+ break;
+ case SERV_SOCK:
+ if (inet_ntop(current->event_serv.peer.sin_family,
+ &current->event_serv.peer.sin_addr,
+ &current->event_serv.peer_addr[0],
+ sizeof(current->event_serv.peer_addr)) == NULL)
+ {
+ if (errno == EAFNOSUPPORT)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "New distributor connection.");
+ }
+ else
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error converting an internet address: %s", strerror(errno));
+ }
+ current->event_serv.peer_addr[0] = '\0';
+ }
+ else
+ {
+ syslog(LOG_DAEMON,
+ "New distributor connection from %.*s:%u",
+ (int)sizeof(current->event_serv.peer_addr),
+ current->event_serv.peer_addr,
+ ntohs(current->event_serv.peer.sin_port));
+ }
+ break;
+ }
+
+ /* nonblocking fd is mandatory */
+ int fd_flags = fcntl(current->fd, F_GETFL, 0);
+ if (fd_flags == -1 || fcntl(current->fd, F_SETFL, fd_flags | O_NONBLOCK) == -1)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno));
+ disconnect_client(epollfd, current);
+ return 1;
+ }
+
+ /* shutdown writing end for collector clients */
+ if (current->sock_type == JSON_SOCK)
+ {
+ shutdown(current->fd, SHUT_WR); // collector
+ /* setup epoll event */
+ struct epoll_event accept_event = {};
+ accept_event.data.ptr = current;
+ accept_event.events = EPOLLIN;
+ if (epoll_ctl(epollfd, EPOLL_CTL_ADD, current->fd, &accept_event) < 0)
+ {
+ disconnect_client(epollfd, current);
+ return 1;
+ }
+ }
+ else
+ {
+ shutdown(current->fd, SHUT_RD); // distributor
+ }
+
+ return 0;
+}
+
+static int handle_collector_protocol(int epollfd, struct remote_desc * const current)
+{
+ char * json_str_start = NULL;
+
+ if (current->buf.ptr[NETWORK_BUFFER_LENGTH_DIGITS] != '{')
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "BUG: JSON invalid opening character: '%c'",
+ current->buf.ptr[NETWORK_BUFFER_LENGTH_DIGITS]);
+ disconnect_client(epollfd, current);
+ return 1;
+ }
+
+ errno = 0;
+ current->event_json.json_bytes = strtoull((char *)current->buf.ptr, &json_str_start, 10);
+ current->event_json.json_bytes += (uint8_t *)json_str_start - current->buf.ptr;
+
+ if (errno == ERANGE)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "BUG: Size of JSON exceeds limit");
+ disconnect_client(epollfd, current);
+ return 1;
+ }
+
+ if ((uint8_t *)json_str_start == current->buf.ptr)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "BUG: Missing size before JSON string: \"%.*s\"",
+ NETWORK_BUFFER_LENGTH_DIGITS,
+ current->buf.ptr);
+ disconnect_client(epollfd, current);
+ return 1;
+ }
+
+ if (current->event_json.json_bytes > current->buf.max)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "BUG: JSON string too big: %llu > %zu",
+ current->event_json.json_bytes,
+ current->buf.max);
+ disconnect_client(epollfd, current);
+ return 1;
+ }
+
+ if (current->event_json.json_bytes > current->buf.used)
+ {
+ return 1;
+ }
+
+ if (current->buf.ptr[current->event_json.json_bytes - 2] != '}' ||
+ current->buf.ptr[current->event_json.json_bytes - 1] != '\n')
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "BUG: Invalid JSON string: %.*s",
+ (int)current->event_json.json_bytes,
+ current->buf.ptr);
+ disconnect_client(epollfd, current);
+ return 1;
+ }
+
+ return 0;
+}
+
+static int handle_incoming_data(int epollfd, struct epoll_event * event)
+{
+ struct remote_desc * current = (struct remote_desc *)event->data.ptr;
+
+ if (current->fd < 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "file descriptor `%d' got from event data invalid", current->fd);
+ return 1;
+ }
+
+ if (event->events & EPOLLIN && current->sock_type == JSON_SOCK)
+ {
+ /* read JSON strings (or parts) from the UNIX socket (collecting) */
+ if (current->buf.used == current->buf.max)
+ {
+ syslog(LOG_DAEMON, "Collector read buffer full. No more read possible.");
+ }
+ else
+ {
+ errno = 0;
+ ssize_t bytes_read =
+ read(current->fd, current->buf.ptr + current->buf.used, current->buf.max - current->buf.used);
+ if (bytes_read < 0 || errno != 0)
+ {
+ disconnect_client(epollfd, current);
+ return 1;
+ }
+ if (bytes_read == 0)
+ {
+ syslog(LOG_DAEMON, "Collector connection closed during read");
+ disconnect_client(epollfd, current);
+ return 1;
+ }
+ current->buf.used += bytes_read;
+ }
+
+ while (current->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1)
+ {
+ if (handle_collector_protocol(epollfd, current) != 0)
+ {
+ break;
+ }
+
+ for (size_t i = 0; i < remotes.desc_size; ++i)
+ {
+ if (remotes.desc[i].fd < 0)
+ {
+ continue;
+ }
+ if (remotes.desc[i].sock_type != SERV_SOCK)
+ {
+ continue;
+ }
+ if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Buffer capacity threshold (%zu of max %zu bytes) reached, "
+ "falling back to blocking mode.",
+ remotes.desc[i].buf.used,
+ remotes.desc[i].buf.max);
+ /*
+ * FIXME: Maybe switch to a Multithreading distributor data transmission,
+ * so that we do not have to switch back to blocking mode here!
+ * NOTE: If *one* distributer peer is too slow, all other distributors are
+ * affected by this. This causes starvation and leads to a possible data loss on
+ * the nDPId collector side.
+ */
+ int fd_flags = fcntl(remotes.desc[i].fd, F_GETFL, 0);
+ if (fd_flags == -1 || fcntl(remotes.desc[i].fd, F_SETFL, fd_flags & ~O_NONBLOCK) == -1)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno));
+ disconnect_client(epollfd, &remotes.desc[i]);
+ continue;
+ }
+ if (write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used) !=
+ (ssize_t)remotes.desc[i].buf.used)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Could not drain buffer by %zu bytes. (forced)",
+ remotes.desc[i].buf.used);
+ disconnect_client(epollfd, &remotes.desc[i]);
+ continue;
+ }
+ remotes.desc[i].buf.used = 0;
+ if (fcntl(remotes.desc[i].fd, F_SETFL, fd_flags) == -1)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno));
+ disconnect_client(epollfd, &remotes.desc[i]);
+ continue;
+ }
+ }
+
+ memcpy(remotes.desc[i].buf.ptr + remotes.desc[i].buf.used,
+ current->buf.ptr,
+ current->event_json.json_bytes);
+ remotes.desc[i].buf.used += current->event_json.json_bytes;
+
+ errno = 0;
+ ssize_t bytes_written = write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used);
+ if (errno == EAGAIN)
+ {
+ continue;
+ }
+ if (bytes_written < 0 || errno != 0)
+ {
+ if (remotes.desc[i].event_serv.peer_addr[0] == '\0')
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Distributor connection closed, send failed: %s", strerror(errno));
+ }
+ else
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Distributor connection to %.*s:%u closed, send failed: %s",
+ (int)sizeof(remotes.desc[i].event_serv.peer_addr),
+ remotes.desc[i].event_serv.peer_addr,
+ ntohs(remotes.desc[i].event_serv.peer.sin_port),
+ strerror(errno));
+ }
+ disconnect_client(epollfd, &remotes.desc[i]);
+ continue;
+ }
+ if (bytes_written == 0)
+ {
+ syslog(LOG_DAEMON,
+ "Distributor connection to %.*s:%u closed during write",
+ (int)sizeof(remotes.desc[i].event_serv.peer_addr),
+ remotes.desc[i].event_serv.peer_addr,
+ ntohs(remotes.desc[i].event_serv.peer.sin_port));
+ disconnect_client(epollfd, &remotes.desc[i]);
+ continue;
+ }
+ if ((size_t)bytes_written < remotes.desc[i].buf.used)
+ {
+ syslog(LOG_DAEMON,
+ "Distributor wrote less than expected to %.*s:%u: %zd < %zu",
+ (int)sizeof(remotes.desc[i].event_serv.peer_addr),
+ remotes.desc[i].event_serv.peer_addr,
+ ntohs(remotes.desc[i].event_serv.peer.sin_port),
+ bytes_written,
+ remotes.desc[i].buf.used);
+ memmove(remotes.desc[i].buf.ptr,
+ remotes.desc[i].buf.ptr + bytes_written,
+ remotes.desc[i].buf.used - bytes_written);
+ remotes.desc[i].buf.used -= bytes_written;
+ continue;
+ }
+
+ remotes.desc[i].buf.used = 0;
+ }
+
+ memmove(current->buf.ptr,
+ current->buf.ptr + current->event_json.json_bytes,
+ current->buf.used - current->event_json.json_bytes);
+ current->buf.used -= current->event_json.json_bytes;
+ current->event_json.json_bytes = 0;
+ }
+ }
+
+ return 0;
+}
+
+static int mainloop(int epollfd)
+{
+ struct epoll_event events[32];
+ size_t const events_size = sizeof(events) / sizeof(events[0]);
+
+ while (main_thread_shutdown == 0)
+ {
+ int nready = epoll_wait(epollfd, events, events_size, -1);
+
+ for (int i = 0; i < nready; i++)
+ {
+ if (events[i].events & EPOLLERR)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Epoll event error: %s",
+ (errno != 0 ? strerror(errno) : "Client disconnected"));
+ if (events[i].data.fd != json_sockfd && events[i].data.fd != serv_sockfd)
+ {
+ struct remote_desc * current = (struct remote_desc *)events[i].data.ptr;
+ disconnect_client(epollfd, current);
+ }
+ continue;
+ }
+
+ if (events[i].data.fd == json_sockfd || events[i].data.fd == serv_sockfd)
+ {
+ /* New connection to collector / distributor. */
+ if (new_connection(epollfd, events[i].data.fd) != 0)
+ {
+ continue;
+ }
+ }
+ else
+ {
+ /* Incoming data. */
+ if (handle_incoming_data(epollfd, &events[i]) != 0)
+ {
+ continue;
+ }
+ }
+ }
+ }
+
+ return 0;
+}
+
int main(int argc, char ** argv)
{
+ int retval = 1;
+
if (argc == 0)
{
return 1;
@@ -371,331 +735,7 @@ int main(int argc, char ** argv)
goto error;
}
- struct epoll_event events[32];
- size_t const events_size = sizeof(events) / sizeof(events[0]);
- while (main_thread_shutdown == 0)
- {
- struct remote_desc * current = NULL;
- int nready = epoll_wait(epollfd, events, events_size, -1);
-
- for (int i = 0; i < nready; i++)
- {
- if (events[i].events & EPOLLERR)
- {
- syslog(LOG_DAEMON | LOG_ERR,
- "Epoll event error: %s",
- (errno != 0 ? strerror(errno) : "Client disconnected"));
- if (events[i].data.fd != json_sockfd && events[i].data.fd != serv_sockfd)
- {
- current = (struct remote_desc *)events[i].data.ptr;
- disconnect_client(epollfd, current);
- }
- continue;
- }
-
- /* New connection to collector / distributor. */
- if (events[i].data.fd == json_sockfd || events[i].data.fd == serv_sockfd)
- {
- current = get_unused_remote_descriptor();
- if (current == NULL)
- {
- syslog(LOG_DAEMON | LOG_ERR, "Max number of connections reached: %zu", remotes.desc_used);
- continue;
- }
- current->type = (events[i].data.fd == json_sockfd ? JSON_SOCK : SERV_SOCK);
-
- int sockfd = (current->type == JSON_SOCK ? json_sockfd : serv_sockfd);
- socklen_t peer_addr_len =
- (current->type == JSON_SOCK ? sizeof(current->event_json.peer) : sizeof(current->event_serv.peer));
-
- current->fd = accept(sockfd,
- (current->type == JSON_SOCK ? (struct sockaddr *)&current->event_json.peer
- : (struct sockaddr *)&current->event_serv.peer),
- &peer_addr_len);
- if (current->fd < 0)
- {
- syslog(LOG_DAEMON | LOG_ERR, "Accept failed: %s", strerror(errno));
- disconnect_client(epollfd, current);
- continue;
- }
-
- switch (current->type)
- {
- case JSON_SOCK:
- current->event_json.json_bytes = 0;
- syslog(LOG_DAEMON, "New collector connection");
- break;
- case SERV_SOCK:
- if (inet_ntop(current->event_serv.peer.sin_family,
- &current->event_serv.peer.sin_addr,
- &current->event_serv.peer_addr[0],
- sizeof(current->event_serv.peer_addr)) == NULL)
- {
- if (errno == EAFNOSUPPORT)
- {
- syslog(LOG_DAEMON | LOG_ERR, "New distributor connection.");
- }
- else
- {
- syslog(LOG_DAEMON | LOG_ERR,
- "Error converting an internet address: %s",
- strerror(errno));
- }
- current->event_serv.peer_addr[0] = '\0';
- }
- else
- {
- syslog(LOG_DAEMON,
- "New distributor connection from %.*s:%u",
- (int)sizeof(current->event_serv.peer_addr),
- current->event_serv.peer_addr,
- ntohs(current->event_serv.peer.sin_port));
- }
- break;
- }
-
- /* nonblocking fd is mandatory */
- int fd_flags = fcntl(current->fd, F_GETFL, 0);
- if (fd_flags == -1 || fcntl(current->fd, F_SETFL, fd_flags | O_NONBLOCK) == -1)
- {
- syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno));
- disconnect_client(epollfd, current);
- continue;
- }
-
- /* shutdown writing end for collector clients */
- if (current->type == JSON_SOCK)
- {
- shutdown(current->fd, SHUT_WR); // collector
- /* setup epoll event */
- struct epoll_event accept_event = {};
- accept_event.data.ptr = current;
- accept_event.events = EPOLLIN;
- if (epoll_ctl(epollfd, EPOLL_CTL_ADD, current->fd, &accept_event) < 0)
- {
- disconnect_client(epollfd, current);
- continue;
- }
- }
- else
- {
- shutdown(current->fd, SHUT_RD); // distributor
- }
- }
- else
- {
- current = (struct remote_desc *)events[i].data.ptr;
-
- if (current->fd < 0)
- {
- syslog(LOG_DAEMON | LOG_ERR, "file descriptor `%d' got from event data invalid", current->fd);
- continue;
- }
-
- if (events[i].events & EPOLLIN && current->type == JSON_SOCK)
- {
- /* read JSON strings (or parts) from the UNIX socket (collecting) */
- if (current->buf.used == current->buf.max)
- {
- syslog(LOG_DAEMON, "Collector read buffer full. No more read possible.");
- }
- else
- {
- errno = 0;
- ssize_t bytes_read = read(current->fd,
- current->buf.ptr + current->buf.used,
- current->buf.max - current->buf.used);
- if (bytes_read < 0 || errno != 0)
- {
- disconnect_client(epollfd, current);
- continue;
- }
- if (bytes_read == 0)
- {
- syslog(LOG_DAEMON, "Collector connection closed during read");
- disconnect_client(epollfd, current);
- continue;
- }
- current->buf.used += bytes_read;
- }
-
- while (current->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1)
- {
- if (current->buf.ptr[NETWORK_BUFFER_LENGTH_DIGITS] != '{')
- {
- syslog(LOG_DAEMON | LOG_ERR,
- "BUG: JSON invalid opening character: '%c'",
- current->buf.ptr[NETWORK_BUFFER_LENGTH_DIGITS]);
- disconnect_client(epollfd, current);
- break;
- }
-
- errno = 0;
- char * json_str_start = NULL;
- current->event_json.json_bytes = strtoull((char *)current->buf.ptr, &json_str_start, 10);
- current->event_json.json_bytes += (uint8_t *)json_str_start - current->buf.ptr;
-
- if (errno == ERANGE)
- {
- syslog(LOG_DAEMON | LOG_ERR, "BUG: Size of JSON exceeds limit");
- disconnect_client(epollfd, current);
- break;
- }
- if ((uint8_t *)json_str_start == current->buf.ptr)
- {
- syslog(LOG_DAEMON | LOG_ERR,
- "BUG: Missing size before JSON string: \"%.*s\"",
- NETWORK_BUFFER_LENGTH_DIGITS,
- current->buf.ptr);
- disconnect_client(epollfd, current);
- break;
- }
- if (current->event_json.json_bytes > current->buf.max)
- {
- syslog(LOG_DAEMON | LOG_ERR,
- "BUG: JSON string too big: %llu > %zu",
- current->event_json.json_bytes,
- current->buf.max);
- disconnect_client(epollfd, current);
- break;
- }
- if (current->event_json.json_bytes > current->buf.used)
- {
- break;
- }
-
- if (current->buf.ptr[current->event_json.json_bytes - 2] != '}' ||
- current->buf.ptr[current->event_json.json_bytes - 1] != '\n')
- {
- syslog(LOG_DAEMON | LOG_ERR,
- "BUG: Invalid JSON string: %.*s",
- (int)current->event_json.json_bytes,
- current->buf.ptr);
- disconnect_client(epollfd, current);
- break;
- }
-
- for (size_t i = 0; i < remotes.desc_size; ++i)
- {
- if (remotes.desc[i].fd < 0)
- {
- continue;
- }
- if (remotes.desc[i].type != SERV_SOCK)
- {
- continue;
- }
- if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used)
- {
- syslog(LOG_DAEMON | LOG_ERR,
- "Buffer capacity threshold (%zu of max %zu bytes) reached, "
- "falling back to blocking mode.",
- remotes.desc[i].buf.used,
- remotes.desc[i].buf.max);
- /*
- * FIXME: Maybe switch to a Multithreading distributor data transmission,
- * so that we do not have to switch back to blocking mode here!
- * NOTE: If *one* distributer peer is too slow, all other distributors are
- * affected by this. This causes starvation and leads to a possible data loss on
- * the nDPId collector side.
- */
- int fd_flags = fcntl(remotes.desc[i].fd, F_GETFL, 0);
- if (fd_flags == -1 || fcntl(remotes.desc[i].fd, F_SETFL, fd_flags & ~O_NONBLOCK) == -1)
- {
- syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno));
- disconnect_client(epollfd, &remotes.desc[i]);
- continue;
- }
- if (write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used) !=
- (ssize_t)remotes.desc[i].buf.used)
- {
- syslog(LOG_DAEMON | LOG_ERR,
- "Could not drain buffer by %zu bytes. (forced)",
- remotes.desc[i].buf.used);
- disconnect_client(epollfd, &remotes.desc[i]);
- continue;
- }
- remotes.desc[i].buf.used = 0;
- if (fcntl(remotes.desc[i].fd, F_SETFL, fd_flags) == -1)
- {
- syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno));
- disconnect_client(epollfd, &remotes.desc[i]);
- continue;
- }
- }
-
- memcpy(remotes.desc[i].buf.ptr + remotes.desc[i].buf.used,
- current->buf.ptr,
- current->event_json.json_bytes);
- remotes.desc[i].buf.used += current->event_json.json_bytes;
-
- errno = 0;
- ssize_t bytes_written =
- write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used);
- if (errno == EAGAIN)
- {
- continue;
- }
- if (bytes_written < 0 || errno != 0)
- {
- if (remotes.desc[i].event_serv.peer_addr[0] == '\0')
- {
- syslog(LOG_DAEMON | LOG_ERR,
- "Distributor connection closed, send failed: %s",
- strerror(errno));
- }
- else
- {
- syslog(LOG_DAEMON | LOG_ERR,
- "Distributor connection to %.*s:%u closed, send failed: %s",
- (int)sizeof(remotes.desc[i].event_serv.peer_addr),
- remotes.desc[i].event_serv.peer_addr,
- ntohs(remotes.desc[i].event_serv.peer.sin_port),
- strerror(errno));
- }
- disconnect_client(epollfd, &remotes.desc[i]);
- continue;
- }
- if (bytes_written == 0)
- {
- syslog(LOG_DAEMON,
- "Distributor connection to %.*s:%u closed during write",
- (int)sizeof(remotes.desc[i].event_serv.peer_addr),
- remotes.desc[i].event_serv.peer_addr,
- ntohs(remotes.desc[i].event_serv.peer.sin_port));
- disconnect_client(epollfd, &remotes.desc[i]);
- continue;
- }
- if ((size_t)bytes_written < remotes.desc[i].buf.used)
- {
- syslog(LOG_DAEMON,
- "Distributor wrote less than expected to %.*s:%u: %zd < %zu",
- (int)sizeof(remotes.desc[i].event_serv.peer_addr),
- remotes.desc[i].event_serv.peer_addr,
- ntohs(remotes.desc[i].event_serv.peer.sin_port),
- bytes_written,
- remotes.desc[i].buf.used);
- memmove(remotes.desc[i].buf.ptr,
- remotes.desc[i].buf.ptr + bytes_written,
- remotes.desc[i].buf.used - bytes_written);
- remotes.desc[i].buf.used -= bytes_written;
- continue;
- }
-
- remotes.desc[i].buf.used = 0;
- }
-
- memmove(current->buf.ptr,
- current->buf.ptr + current->event_json.json_bytes,
- current->buf.used - current->event_json.json_bytes);
- current->buf.used -= current->event_json.json_bytes;
- current->event_json.json_bytes = 0;
- }
- }
- }
- }
- }
-
+ retval = mainloop(epollfd);
error:
close(json_sockfd);
close(serv_sockfd);
@@ -707,5 +747,5 @@ error:
unlink(json_sockpath);
unlink(serv_optarg);
- return 0;
+ return retval;
}