summaryrefslogtreecommitdiff
path: root/nDPIsrvd.c
diff options
context:
space:
mode:
authorlns <matzeton@googlemail.com>2022-04-16 23:21:24 +0200
committerlns <matzeton@googlemail.com>2022-04-16 23:21:24 +0200
commitc283b89afda98fdcee81fe1b9634f10af1077878 (patch)
tree725a3496177903258069a2d11536c348722d79b8 /nDPIsrvd.c
parentdb83f82d29df4fd0dfe638fad305366fb265edb8 (diff)
Refactored buffer subsystem.
Signed-off-by: lns <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r--nDPIsrvd.c353
1 files changed, 249 insertions, 104 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c
index 05d3287e5..437ef5616 100644
--- a/nDPIsrvd.c
+++ b/nDPIsrvd.c
@@ -26,12 +26,17 @@ enum sock_type
DISTRIBUTOR_IN,
};
+struct nDPIsrvd_write_buffer
+{
+ struct nDPIsrvd_buffer buf;
+ size_t written;
+};
+
struct remote_desc
{
enum sock_type sock_type;
int fd;
- struct nDPIsrvd_buffer buf;
- UT_array * buf_cache;
+
union
{
struct
@@ -40,6 +45,8 @@ struct remote_desc
struct sockaddr_un peer;
unsigned long long int json_bytes;
pid_t pid;
+
+ struct nDPIsrvd_json_buffer main_read_buffer;
} event_collector_un;
struct
{
@@ -47,12 +54,18 @@ struct remote_desc
struct sockaddr_un peer;
pid_t pid;
char * user_name;
+
+ struct nDPIsrvd_write_buffer main_write_buffer;
+ UT_array * additional_write_buffers;
} event_distributor_un; /* UNIX socket */
struct
{
int distributor_sockfd;
struct sockaddr_in peer;
char peer_addr[INET_ADDRSTRLEN];
+
+ struct nDPIsrvd_write_buffer main_write_buffer;
+ UT_array * additional_write_buffers;
} event_distributor_in; /* TCP/IP socket */
};
};
@@ -81,11 +94,11 @@ static struct
nDPIsrvd_ull max_remote_descriptors;
char * user;
char * group;
- nDPIsrvd_ull cache_array_length;
- int cache_fallback_to_blocking;
+ nDPIsrvd_ull max_write_buffers;
+ int bufferbloat_fallback_to_blocking;
} nDPIsrvd_options = {.max_remote_descriptors = nDPIsrvd_MAX_REMOTE_DESCRIPTORS,
- .cache_array_length = nDPIsrvd_CACHE_ARRAY_LENGTH,
- .cache_fallback_to_blocking = 1};
+ .max_write_buffers = nDPIsrvd_MAX_WRITE_BUFFERS,
+ .bufferbloat_fallback_to_blocking = 1};
static void logger_nDPIsrvd(struct remote_desc const * const remote,
char const * const prefix,
@@ -97,34 +110,32 @@ static int add_in_event_fd(int epollfd, int fd);
static int add_in_event(int epollfd, struct remote_desc * const remote);
static int del_out_event(int epollfd, struct remote_desc * const remote);
static void disconnect_client(int epollfd, struct remote_desc * const current);
-static int drain_cache_blocking(struct remote_desc * const remote);
+static int drain_write_buffers_blocking(struct remote_desc * const remote);
static void nDPIsrvd_buffer_array_copy(void * dst, const void * src)
{
- struct nDPIsrvd_buffer * const buf_dst = (struct nDPIsrvd_buffer *)dst;
- struct nDPIsrvd_buffer const * const buf_src = (struct nDPIsrvd_buffer *)src;
+ struct nDPIsrvd_write_buffer * const buf_dst = (struct nDPIsrvd_write_buffer *)dst;
+ struct nDPIsrvd_write_buffer const * const buf_src = (struct nDPIsrvd_write_buffer *)src;
- buf_dst->ptr.raw = NULL;
- if (nDPIsrvd_buffer_init(buf_dst, buf_src->used) != 0)
+ buf_dst->buf.ptr.raw = NULL;
+ if (nDPIsrvd_buffer_init(&buf_dst->buf, buf_src->buf.used) != 0)
{
return;
}
- buf_dst->json_string_start = buf_src->json_string_start;
- buf_dst->json_string_length = buf_src->json_string_length;
- buf_dst->json_string = buf_src->json_string;
- buf_dst->used = buf_src->used;
- memcpy(buf_dst->ptr.raw, buf_src->ptr.raw, buf_src->used);
+ buf_dst->written = buf_src->written;
+ buf_dst->buf.used = buf_src->buf.used;
+ memcpy(buf_dst->buf.ptr.raw, buf_src->buf.ptr.raw, buf_src->buf.used);
}
static void nDPIsrvd_buffer_array_dtor(void * elt)
{
- struct nDPIsrvd_buffer * const buf = (struct nDPIsrvd_buffer *)elt;
+ struct nDPIsrvd_write_buffer * const buf_dst = (struct nDPIsrvd_write_buffer *)elt;
- nDPIsrvd_buffer_free(buf);
+ nDPIsrvd_buffer_free(&buf_dst->buf);
}
-static const UT_icd nDPIsrvd_buffer_array_icd = {sizeof(struct nDPIsrvd_buffer),
+static const UT_icd nDPIsrvd_buffer_array_icd = {sizeof(struct nDPIsrvd_write_buffer),
NULL,
nDPIsrvd_buffer_array_copy,
nDPIsrvd_buffer_array_dtor};
@@ -142,36 +153,93 @@ void nDPIsrvd_memprof_log(char const * const format, ...)
#endif
#endif
-static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf, nDPIsrvd_ull json_string_length)
+static struct nDPIsrvd_json_buffer * get_read_buffer(struct remote_desc * const remote)
+{
+ switch (remote->sock_type)
+ {
+ case COLLECTOR_UN:
+ return &remote->event_collector_un.main_read_buffer;
+
+ case DISTRIBUTOR_UN:
+ case DISTRIBUTOR_IN:
+ return NULL;
+ }
+
+ return NULL;
+}
+
+static struct nDPIsrvd_write_buffer * get_write_buffer(struct remote_desc * const remote)
+{
+ switch (remote->sock_type)
+ {
+ case COLLECTOR_UN:
+ return NULL;
+
+ case DISTRIBUTOR_UN:
+ return &remote->event_distributor_un.main_write_buffer;
+
+ case DISTRIBUTOR_IN:
+ return &remote->event_distributor_in.main_write_buffer;
+ }
+
+ return NULL;
+}
+
+static UT_array * get_additional_write_buffers(struct remote_desc * const remote)
{
- struct nDPIsrvd_buffer buf_src = {};
+ switch (remote->sock_type)
+ {
+ case COLLECTOR_UN:
+ return NULL;
- if (utarray_len(remote->buf_cache) >= nDPIsrvd_options.cache_array_length)
+ case DISTRIBUTOR_UN:
+ return remote->event_distributor_un.additional_write_buffers;
+
+ case DISTRIBUTOR_IN:
+ return remote->event_distributor_in.additional_write_buffers;
+ }
+
+ return NULL;
+}
+
+static int add_to_additional_write_buffers(struct remote_desc * const remote,
+ uint8_t * const buf,
+ nDPIsrvd_ull json_string_length)
+{
+ struct nDPIsrvd_write_buffer buf_src = {};
+ UT_array * const additional_write_buffers = get_additional_write_buffers(remote);
+
+ if (additional_write_buffers == NULL)
{
- if (nDPIsrvd_options.cache_fallback_to_blocking == 0)
+ return -1;
+ }
+
+ if (utarray_len(additional_write_buffers) >= nDPIsrvd_options.max_write_buffers)
+ {
+ if (nDPIsrvd_options.bufferbloat_fallback_to_blocking == 0)
{
logger_nDPIsrvd(remote,
- "Buffer cache limit for",
+ "Buffer limit for",
"for reached, remote too slow: %u lines",
- utarray_len(remote->buf_cache));
+ utarray_len(additional_write_buffers));
return -1;
}
else
{
logger_nDPIsrvd(remote,
- "Buffer JSON string cache limit for",
+ "Buffer limit for",
"reached, falling back to blocking I/O: %u lines",
- utarray_len(remote->buf_cache));
- if (drain_cache_blocking(remote) != 0)
+ utarray_len(additional_write_buffers));
+ if (drain_write_buffers_blocking(remote) != 0)
{
return -1;
}
}
}
- buf_src.ptr.raw = buf;
- buf_src.used = buf_src.max = buf_src.json_string_length = json_string_length;
- utarray_push_back(remote->buf_cache, &buf_src);
+ buf_src.buf.ptr.raw = buf;
+ buf_src.buf.used = buf_src.buf.max = json_string_length;
+ utarray_push_back(additional_write_buffers, &buf_src);
return 0;
}
@@ -216,13 +284,20 @@ static void logger_nDPIsrvd(struct remote_desc const * const remote,
static int drain_main_buffer(struct remote_desc * const remote)
{
- if (remote->buf.used == 0)
+ struct nDPIsrvd_write_buffer * const write_buffer = get_write_buffer(remote);
+
+ if (write_buffer == NULL)
+ {
+ return -1;
+ }
+
+ if (write_buffer->buf.used == 0)
{
return 0;
}
errno = 0;
- ssize_t bytes_written = write(remote->fd, remote->buf.ptr.raw, remote->buf.used);
+ ssize_t bytes_written = write(remote->fd, write_buffer->buf.ptr.raw, write_buffer->buf.used);
if (errno == EAGAIN)
{
return 0;
@@ -237,32 +312,37 @@ static int drain_main_buffer(struct remote_desc * const remote)
logger_nDPIsrvd(remote, "Distributor connection", "closed");
return -1;
}
- if ((size_t)bytes_written < remote->buf.used)
+ if ((size_t)bytes_written < write_buffer->buf.used)
{
#if 0
logger_nDPIsrvd(
remote, "Distributor", "wrote less than expected: %zd < %zu", bytes_written, remote->buf.used);
#endif
- memmove(remote->buf.ptr.raw, remote->buf.ptr.raw + bytes_written, remote->buf.used - bytes_written);
+ memmove(write_buffer->buf.ptr.raw,
+ write_buffer->buf.ptr.raw + bytes_written,
+ write_buffer->buf.used - bytes_written);
}
- remote->buf.used -= bytes_written;
+ write_buffer->buf.used -= bytes_written;
return 0;
}
-static int drain_cache(struct remote_desc * const remote)
+static int drain_write_buffers(struct remote_desc * const remote)
{
+ UT_array * const additional_write_buffers = get_additional_write_buffers(remote);
+
errno = 0;
- if (drain_main_buffer(remote) != 0)
+ if (drain_main_buffer(remote) != 0 || additional_write_buffers == NULL)
{
return -1;
}
- while (utarray_len(remote->buf_cache) > 0)
+ while (utarray_len(additional_write_buffers) > 0)
{
- struct nDPIsrvd_buffer * buf = (struct nDPIsrvd_buffer *)utarray_front(remote->buf_cache);
- ssize_t written = write(remote->fd, buf->ptr.raw + buf->json_string_start, buf->json_string_length);
+ struct nDPIsrvd_write_buffer * buf = (struct nDPIsrvd_write_buffer *)utarray_front(additional_write_buffers);
+ ssize_t written = write(remote->fd, buf->buf.ptr.raw + buf->written, buf->buf.used - buf->written);
+
switch (written)
{
case -1:
@@ -274,11 +354,10 @@ static int drain_cache(struct remote_desc * const remote)
case 0:
return -1;
default:
- buf->json_string_start += written;
- buf->json_string_length -= written;
- if (buf->json_string_length == 0)
+ buf->written += written;
+ if (buf->written == buf->buf.max)
{
- utarray_erase(remote->buf_cache, 0, 1);
+ utarray_erase(additional_write_buffers, 0, 1);
}
break;
}
@@ -287,7 +366,7 @@ static int drain_cache(struct remote_desc * const remote)
return 0;
}
-static int drain_cache_blocking(struct remote_desc * const remote)
+static int drain_write_buffers_blocking(struct remote_desc * const remote)
{
int retval = 0;
@@ -296,9 +375,9 @@ static int drain_cache_blocking(struct remote_desc * const remote)
logger_nDPIsrvd(remote, "Error setting distributor", "fd flags to blocking mode: %s", strerror(errno));
return -1;
}
- if (drain_cache(remote) != 0)
+ if (drain_write_buffers(remote) != 0)
{
- logger_nDPIsrvd(remote, "Could not drain buffer cache for", "in blocking I/O: %s", strerror(errno));
+ logger_nDPIsrvd(remote, "Could not drain buffers for", "in blocking I/O: %s", strerror(errno));
retval = -1;
}
if (fcntl_add_flags(remote->fd, O_NONBLOCK) != 0)
@@ -312,17 +391,19 @@ static int drain_cache_blocking(struct remote_desc * const remote)
static int handle_outgoing_data(int epollfd, struct remote_desc * const remote)
{
- if (remote->sock_type != DISTRIBUTOR_UN && remote->sock_type != DISTRIBUTOR_IN)
+ UT_array * const additional_write_buffers = get_additional_write_buffers(remote);
+
+ if (additional_write_buffers == NULL)
{
return -1;
}
- if (drain_cache(remote) != 0)
+ if (drain_write_buffers(remote) != 0)
{
- logger_nDPIsrvd(remote, "Could not drain buffer cache for", ": %s", strerror(errno));
+ logger_nDPIsrvd(remote, "Could not drain buffers for", ": %s", strerror(errno));
disconnect_client(epollfd, remote);
return -1;
}
- if (utarray_len(remote->buf_cache) == 0)
+ if (utarray_len(additional_write_buffers) == 0)
{
return del_out_event(epollfd, remote);
}
@@ -517,14 +598,42 @@ static struct remote_desc * get_remote_descriptor(enum sock_type type, int remot
if (remotes.desc[i].fd == -1)
{
remotes.desc_used++;
- if (remotes.desc[i].buf_cache == NULL)
+
+ struct nDPIsrvd_write_buffer * write_buffer = NULL;
+ UT_array ** additional_write_buffers = NULL;
+
+ switch (type)
{
- utarray_new(remotes.desc[i].buf_cache, &nDPIsrvd_buffer_array_icd);
+ case COLLECTOR_UN:
+ if (nDPIsrvd_json_buffer_init(&remotes.desc[i].event_collector_un.main_read_buffer,
+ max_buffer_size) != 0)
+ {
+ return NULL;
+ }
+ break;
+ case DISTRIBUTOR_UN:
+ write_buffer = &remotes.desc[i].event_distributor_un.main_write_buffer;
+ additional_write_buffers = &remotes.desc[i].event_distributor_un.additional_write_buffers;
+ break;
+ case DISTRIBUTOR_IN:
+ write_buffer = &remotes.desc[i].event_distributor_in.main_write_buffer;
+ additional_write_buffers = &remotes.desc[i].event_distributor_in.additional_write_buffers;
+ break;
}
- if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, max_buffer_size) != 0 || remotes.desc[i].buf_cache == NULL)
+
+ if (additional_write_buffers != NULL && *additional_write_buffers == NULL)
+ {
+ utarray_new(*additional_write_buffers, &nDPIsrvd_buffer_array_icd);
+ if (*additional_write_buffers == NULL)
+ {
+ return NULL;
+ }
+ }
+ if (write_buffer != NULL && nDPIsrvd_buffer_init(&write_buffer->buf, max_buffer_size) != 0)
{
return NULL;
}
+
remotes.desc[i].sock_type = type;
remotes.desc[i].fd = remote_fd;
return &remotes.desc[i];
@@ -538,12 +647,28 @@ static void free_remotes(void)
{
for (size_t i = 0; i < remotes.desc_size; ++i)
{
- if (remotes.desc[i].buf_cache != NULL)
+ switch (remotes.desc[i].sock_type)
{
- utarray_free(remotes.desc[i].buf_cache);
- remotes.desc[i].buf_cache = NULL;
+ case COLLECTOR_UN:
+ nDPIsrvd_json_buffer_free(&remotes.desc[i].event_collector_un.main_read_buffer);
+ break;
+ case DISTRIBUTOR_UN:
+ if (remotes.desc[i].event_distributor_un.additional_write_buffers != NULL)
+ {
+ utarray_free(remotes.desc[i].event_distributor_un.additional_write_buffers);
+ remotes.desc[i].event_distributor_un.additional_write_buffers = NULL;
+ }
+ nDPIsrvd_buffer_free(&remotes.desc[i].event_distributor_un.main_write_buffer.buf);
+ break;
+ case DISTRIBUTOR_IN:
+ if (remotes.desc[i].event_distributor_in.additional_write_buffers != NULL)
+ {
+ utarray_free(remotes.desc[i].event_distributor_in.additional_write_buffers);
+ remotes.desc[i].event_distributor_in.additional_write_buffers = NULL;
+ }
+ nDPIsrvd_buffer_free(&remotes.desc[i].event_distributor_in.main_write_buffer.buf);
+ break;
}
- nDPIsrvd_buffer_free(&remotes.desc[i].buf);
}
}
@@ -610,10 +735,25 @@ static void disconnect_client(int epollfd, struct remote_desc * const current)
{
case COLLECTOR_UN:
logger_nDPIsrvd(current, "Error closing collector connection", ": %s", strerror(errno));
+ nDPIsrvd_json_buffer_free(&current->event_collector_un.main_read_buffer);
break;
case DISTRIBUTOR_UN:
+ logger_nDPIsrvd(current, "Error closing distributor connection", ": %s", strerror(errno));
+ if (current->event_distributor_un.additional_write_buffers != NULL)
+ {
+ utarray_clear(current->event_distributor_un.additional_write_buffers);
+ }
+ nDPIsrvd_buffer_free(&current->event_distributor_un.main_write_buffer.buf);
+ current->event_distributor_un.main_write_buffer.written = 0;
+ break;
case DISTRIBUTOR_IN:
logger_nDPIsrvd(current, "Error closing distributor connection", ": %s", strerror(errno));
+ if (current->event_distributor_in.additional_write_buffers != NULL)
+ {
+ utarray_clear(current->event_distributor_in.additional_write_buffers);
+ }
+ nDPIsrvd_buffer_free(&current->event_distributor_in.main_write_buffer.buf);
+ current->event_distributor_in.main_write_buffer.written = 0;
break;
}
}
@@ -625,11 +765,6 @@ static void disconnect_client(int epollfd, struct remote_desc * const current)
current->fd = -1;
remotes.desc_used--;
}
- if (current->buf_cache != NULL)
- {
- utarray_clear(current->buf_cache);
- }
- nDPIsrvd_buffer_free(&current->buf);
}
static int nDPIsrvd_parse_options(int argc, char ** argv)
@@ -684,14 +819,14 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
nDPIsrvd_options.group = strdup(optarg);
break;
case 'C':
- if (str_value_to_ull(optarg, &nDPIsrvd_options.cache_array_length) != CONVERSION_OK)
+ if (str_value_to_ull(optarg, &nDPIsrvd_options.max_write_buffers) != CONVERSION_OK)
{
fprintf(stderr, "%s: Argument for `-C' is not a number: %s\n", argv[0], optarg);
return 1;
}
break;
case 'D':
- nDPIsrvd_options.cache_fallback_to_blocking = 0;
+ nDPIsrvd_options.bufferbloat_fallback_to_blocking = 0;
break;
case 'v':
fprintf(stderr, "%s", get_nDPId_version());
@@ -952,21 +1087,27 @@ static int new_connection(int epollfd, int eventfd)
static int handle_collector_protocol(int epollfd, struct remote_desc * const current)
{
+ struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current);
char * json_str_start = NULL;
- if (current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{')
+ if (json_read_buffer == NULL)
+ {
+ return 1;
+ }
+
+ if (json_read_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{')
{
logger_nDPIsrvd(current,
"BUG: Collector connection",
"JSON invalid opening character: '%c'",
- current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]);
+ json_read_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]);
disconnect_client(epollfd, current);
return 1;
}
errno = 0;
- current->event_collector_un.json_bytes = strtoull((char *)current->buf.ptr.text, &json_str_start, 10);
- current->event_collector_un.json_bytes += json_str_start - current->buf.ptr.text;
+ current->event_collector_un.json_bytes = strtoull(json_read_buffer->buf.ptr.text, &json_str_start, 10);
+ current->event_collector_un.json_bytes += json_str_start - json_read_buffer->buf.ptr.text;
if (errno == ERANGE)
{
@@ -975,18 +1116,18 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
return 1;
}
- if (json_str_start == current->buf.ptr.text)
+ if (json_str_start == json_read_buffer->buf.ptr.text)
{
logger_nDPIsrvd(current,
"BUG: Collector connection",
"missing JSON string length in protocol preamble: \"%.*s\"",
NETWORK_BUFFER_LENGTH_DIGITS,
- current->buf.ptr.text);
+ json_read_buffer->buf.ptr.text);
disconnect_client(epollfd, current);
return 1;
}
- if (json_str_start - current->buf.ptr.text != NETWORK_BUFFER_LENGTH_DIGITS)
+ if (json_str_start - json_read_buffer->buf.ptr.text != NETWORK_BUFFER_LENGTH_DIGITS)
{
logger_nDPIsrvd(current,
"BUG: Collector connection",
@@ -994,33 +1135,33 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
"%ld "
"bytes",
NETWORK_BUFFER_LENGTH_DIGITS,
- (long int)(json_str_start - current->buf.ptr.text));
+ (long int)(json_str_start - json_read_buffer->buf.ptr.text));
}
- if (current->event_collector_un.json_bytes > current->buf.max)
+ if (current->event_collector_un.json_bytes > json_read_buffer->buf.max)
{
logger_nDPIsrvd(current,
"BUG: Collector connection",
"JSON string too big: %llu > %zu",
current->event_collector_un.json_bytes,
- current->buf.max);
+ json_read_buffer->buf.max);
disconnect_client(epollfd, current);
return 1;
}
- if (current->event_collector_un.json_bytes > current->buf.used)
+ if (current->event_collector_un.json_bytes > json_read_buffer->buf.used)
{
return 1;
}
- if (current->buf.ptr.text[current->event_collector_un.json_bytes - 2] != '}' ||
- current->buf.ptr.text[current->event_collector_un.json_bytes - 1] != '\n')
+ if (json_read_buffer->buf.ptr.text[current->event_collector_un.json_bytes - 2] != '}' ||
+ json_read_buffer->buf.ptr.text[current->event_collector_un.json_bytes - 1] != '\n')
{
logger_nDPIsrvd(current,
"BUG: Collector connection",
"invalid JSON string: %.*s",
(int)current->event_collector_un.json_bytes,
- current->buf.ptr.text);
+ json_read_buffer->buf.ptr.text);
disconnect_client(epollfd, current);
return 1;
}
@@ -1030,7 +1171,9 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
static int handle_incoming_data(int epollfd, struct remote_desc * const current)
{
- if (current->sock_type != COLLECTOR_UN)
+ struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current);
+
+ if (json_read_buffer == NULL)
{
unsigned char garbage = 0;
@@ -1047,18 +1190,19 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
}
/* read JSON strings (or parts) from the UNIX socket (collecting) */
- if (current->buf.used == current->buf.max)
+ if (json_read_buffer->buf.used == json_read_buffer->buf.max)
{
logger_nDPIsrvd(current,
"Collector connection",
"read buffer (%zu bytes) full. No more read possible.",
- current->buf.max);
+ json_read_buffer->buf.max);
}
else
{
errno = 0;
- ssize_t bytes_read =
- read(current->fd, current->buf.ptr.raw + current->buf.used, current->buf.max - current->buf.used);
+ ssize_t bytes_read = read(current->fd,
+ json_read_buffer->buf.ptr.raw + json_read_buffer->buf.used,
+ json_read_buffer->buf.max - json_read_buffer->buf.used);
if (bytes_read < 0 || errno != 0)
{
logger_nDPIsrvd(current, "Could not read remote", ": %s", strerror(errno));
@@ -1071,10 +1215,10 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
disconnect_client(epollfd, current);
return 1;
}
- current->buf.used += bytes_read;
+ json_read_buffer->buf.used += bytes_read;
}
- while (current->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1)
+ while (json_read_buffer->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1)
{
if (handle_collector_protocol(epollfd, current) != 0)
{
@@ -1083,19 +1227,18 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
for (size_t i = 0; i < remotes.desc_size; ++i)
{
- if (remotes.desc[i].fd < 0)
- {
- continue;
- }
- if (remotes.desc[i].sock_type != DISTRIBUTOR_UN && remotes.desc[i].sock_type != DISTRIBUTOR_IN)
+ struct nDPIsrvd_write_buffer * const write_buffer = get_write_buffer(&remotes.desc[i]);
+ UT_array * const additional_write_buffers = get_additional_write_buffers(&remotes.desc[i]);
+
+ if (remotes.desc[i].fd < 0 || write_buffer == NULL || additional_write_buffers == NULL)
{
continue;
}
- if (current->event_collector_un.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used ||
- utarray_len(remotes.desc[i].buf_cache) > 0)
+ if (current->event_collector_un.json_bytes > write_buffer->buf.max - write_buffer->buf.used ||
+ utarray_len(additional_write_buffers) > 0)
{
- if (utarray_len(remotes.desc[i].buf_cache) == 0)
+ if (utarray_len(additional_write_buffers) == 0)
{
#if 0
logger_nDPIsrvd(&remotes.desc[i],
@@ -1114,7 +1257,9 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
continue;
}
}
- if (add_to_cache(&remotes.desc[i], current->buf.ptr.raw, current->event_collector_un.json_bytes) != 0)
+ if (add_to_additional_write_buffers(&remotes.desc[i],
+ json_read_buffer->buf.ptr.raw,
+ current->event_collector_un.json_bytes) != 0)
{
disconnect_client(epollfd, &remotes.desc[i]);
continue;
@@ -1122,10 +1267,10 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
}
else
{
- memcpy(remotes.desc[i].buf.ptr.raw + remotes.desc[i].buf.used,
- current->buf.ptr.raw,
+ memcpy(write_buffer->buf.ptr.raw + write_buffer->buf.used,
+ json_read_buffer->buf.ptr.raw,
current->event_collector_un.json_bytes);
- remotes.desc[i].buf.used += current->event_collector_un.json_bytes;
+ write_buffer->buf.used += current->event_collector_un.json_bytes;
}
if (drain_main_buffer(&remotes.desc[i]) != 0)
@@ -1134,10 +1279,10 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
}
}
- memmove(current->buf.ptr.raw,
- current->buf.ptr.raw + current->event_collector_un.json_bytes,
- current->buf.used - current->event_collector_un.json_bytes);
- current->buf.used -= current->event_collector_un.json_bytes;
+ memmove(json_read_buffer->buf.ptr.raw,
+ json_read_buffer->buf.ptr.raw + current->event_collector_un.json_bytes,
+ json_read_buffer->buf.used - current->event_collector_un.json_bytes);
+ json_read_buffer->buf.used -= current->event_collector_un.json_bytes;
current->event_collector_un.json_bytes = 0;
}
@@ -1230,7 +1375,7 @@ static int mainloop(int epollfd)
switch (current->sock_type)
{
case COLLECTOR_UN:
- logger_nDPIsrvd(current, "Collector disconnected", "closed");
+ logger_nDPIsrvd(current, "Collector connection", "closed");
break;
case DISTRIBUTOR_UN:
case DISTRIBUTOR_IN: