aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlns <matzeton@googlemail.com>2022-04-16 23:21:24 +0200
committerlns <matzeton@googlemail.com>2022-04-16 23:21:24 +0200
commitc283b89afda98fdcee81fe1b9634f10af1077878 (patch)
tree725a3496177903258069a2d11536c348722d79b8
parentdb83f82d29df4fd0dfe638fad305366fb265edb8 (diff)
Refactored buffer subsystem.
Signed-off-by: lns <matzeton@googlemail.com>
-rw-r--r--config.h2
-rw-r--r--dependencies/nDPIsrvd.h79
-rw-r--r--nDPId-test.c17
-rw-r--r--nDPIsrvd.c353
4 files changed, 311 insertions, 140 deletions
diff --git a/config.h b/config.h
index e4f90f5d3..2f37bcc8b 100644
--- a/config.h
+++ b/config.h
@@ -39,6 +39,6 @@
/* nDPIsrvd default config options */
#define nDPIsrvd_PIDFILE "/tmp/ndpisrvd.pid"
#define nDPIsrvd_MAX_REMOTE_DESCRIPTORS 32
-#define nDPIsrvd_CACHE_ARRAY_LENGTH 256
+#define nDPIsrvd_MAX_WRITE_BUFFERS 1024
#endif
diff --git a/dependencies/nDPIsrvd.h b/dependencies/nDPIsrvd.h
index 44bd4703c..05f41335c 100644
--- a/dependencies/nDPIsrvd.h
+++ b/dependencies/nDPIsrvd.h
@@ -196,6 +196,11 @@ struct nDPIsrvd_buffer
} ptr;
size_t used;
size_t max;
+};
+
+struct nDPIsrvd_json_buffer
+{
+ struct nDPIsrvd_buffer buf;
char * json_string;
size_t json_string_start;
nDPIsrvd_ull json_string_length;
@@ -221,7 +226,7 @@ struct nDPIsrvd_socket
instance_cleanup_callback instance_cleanup_callback;
flow_cleanup_callback flow_cleanup_callback;
- struct nDPIsrvd_buffer buffer;
+ struct nDPIsrvd_json_buffer buffer;
struct nDPIsrvd_jsmn jsmn;
/* easy and fast JSON key/value access via hash table and a static array */
@@ -375,9 +380,6 @@ static inline int nDPIsrvd_buffer_init(struct nDPIsrvd_buffer * const buffer, si
return 1;
}
- buffer->json_string_start = 0;
- buffer->json_string_length = 0ull;
- buffer->json_string = NULL;
buffer->used = 0;
buffer->max = buffer_size;
@@ -390,6 +392,24 @@ static inline void nDPIsrvd_buffer_free(struct nDPIsrvd_buffer * const buffer)
buffer->ptr.raw = NULL;
}
+static inline int nDPIsrvd_json_buffer_init(struct nDPIsrvd_json_buffer * const json_buffer, size_t json_buffer_size)
+{
+ int ret = nDPIsrvd_buffer_init(&json_buffer->buf, json_buffer_size);
+ if (ret == 0)
+ {
+ json_buffer->json_string_start = 0ul;
+ json_buffer->json_string_length = 0ull;
+ json_buffer->json_string = NULL;
+ }
+
+ return ret;
+}
+
+static inline void nDPIsrvd_json_buffer_free(struct nDPIsrvd_json_buffer * const json_buffer)
+{
+ nDPIsrvd_buffer_free(&json_buffer->buf);
+}
+
static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_data_size,
size_t instance_user_data_size,
size_t thread_user_data_size,
@@ -409,7 +429,7 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_d
if (sock != NULL)
{
sock->fd = -1;
- if (nDPIsrvd_buffer_init(&sock->buffer, NETWORK_BUFFER_MAX_SIZE) != 0)
+ if (nDPIsrvd_json_buffer_init(&sock->buffer, NETWORK_BUFFER_MAX_SIZE) != 0)
{
goto error;
}
@@ -435,7 +455,7 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_d
return sock;
error:
- nDPIsrvd_buffer_free(&sock->buffer);
+ nDPIsrvd_json_buffer_free(&sock->buffer);
nDPIsrvd_socket_free(&sock);
return NULL;
}
@@ -545,7 +565,7 @@ static inline void nDPIsrvd_socket_free(struct nDPIsrvd_socket ** const sock)
}
(*sock)->instance_table = NULL;
- nDPIsrvd_buffer_free(&(*sock)->buffer);
+ nDPIsrvd_json_buffer_free(&(*sock)->buffer);
nDPIsrvd_free(*sock);
*sock = NULL;
@@ -641,12 +661,13 @@ static inline enum nDPIsrvd_connect_return nDPIsrvd_connect(struct nDPIsrvd_sock
static inline enum nDPIsrvd_read_return nDPIsrvd_read(struct nDPIsrvd_socket * const sock)
{
- if (sock->buffer.used == sock->buffer.max)
+ if (sock->buffer.buf.used == sock->buffer.buf.max)
{
return READ_OK;
}
- ssize_t bytes_read = read(sock->fd, sock->buffer.ptr.raw + sock->buffer.used, sock->buffer.max - sock->buffer.used);
+ ssize_t bytes_read =
+ read(sock->fd, sock->buffer.buf.ptr.raw + sock->buffer.buf.used, sock->buffer.buf.max - sock->buffer.buf.used);
if (bytes_read == 0)
{
@@ -657,7 +678,7 @@ static inline enum nDPIsrvd_read_return nDPIsrvd_read(struct nDPIsrvd_socket * c
return READ_ERROR;
}
- sock->buffer.used += bytes_read;
+ sock->buffer.buf.used += bytes_read;
return READ_OK;
}
@@ -1080,49 +1101,49 @@ static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock,
return 0;
}
-static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_line(struct nDPIsrvd_buffer * const buffer,
+static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_line(struct nDPIsrvd_json_buffer * const json_buffer,
struct nDPIsrvd_jsmn * const jsmn)
{
- if (buffer->used < NETWORK_BUFFER_LENGTH_DIGITS + 1)
+ if (json_buffer->buf.used < NETWORK_BUFFER_LENGTH_DIGITS + 1)
{
return PARSE_NEED_MORE_DATA;
}
- if (buffer->ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{')
+ if (json_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{')
{
return PARSE_INVALID_OPENING_CHAR;
}
errno = 0;
- buffer->json_string_length = strtoull((const char *)buffer->ptr.text, &buffer->json_string, 10);
- buffer->json_string_length += buffer->json_string - buffer->ptr.text;
- buffer->json_string_start = buffer->json_string - buffer->ptr.text;
+ json_buffer->json_string_length = strtoull((const char *)json_buffer->buf.ptr.text, &json_buffer->json_string, 10);
+ json_buffer->json_string_length += json_buffer->json_string - json_buffer->buf.ptr.text;
+ json_buffer->json_string_start = json_buffer->json_string - json_buffer->buf.ptr.text;
if (errno == ERANGE)
{
return PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT;
}
- if (buffer->json_string == buffer->ptr.text)
+ if (json_buffer->json_string == json_buffer->buf.ptr.text)
{
return PARSE_SIZE_MISSING;
}
- if (buffer->json_string_length > buffer->max)
+ if (json_buffer->json_string_length > json_buffer->buf.max)
{
return PARSE_STRING_TOO_BIG;
}
- if (buffer->json_string_length > buffer->used)
+ if (json_buffer->json_string_length > json_buffer->buf.used)
{
return PARSE_NEED_MORE_DATA;
}
- if (buffer->ptr.text[buffer->json_string_length - 2] != '}' ||
- buffer->ptr.text[buffer->json_string_length - 1] != '\n')
+ if (json_buffer->buf.ptr.text[json_buffer->json_string_length - 2] != '}' ||
+ json_buffer->buf.ptr.text[json_buffer->json_string_length - 1] != '\n')
{
return PARSE_INVALID_CLOSING_CHAR;
}
jsmn_init(&jsmn->parser);
jsmn->tokens_found = jsmn_parse(&jsmn->parser,
- buffer->ptr.text + buffer->json_string_start,
- buffer->json_string_length - buffer->json_string_start,
+ json_buffer->buf.ptr.text + json_buffer->json_string_start,
+ json_buffer->json_string_length - json_buffer->json_string_start,
jsmn->tokens,
nDPIsrvd_MAX_JSON_TOKENS);
if (jsmn->tokens_found < 0 || jsmn->tokens[0].type != JSMN_OBJECT)
@@ -1143,12 +1164,14 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_line(struct nDPIsrvd_buf
return PARSE_OK;
}
-static void nDPIsrvd_drain_buffer(struct nDPIsrvd_buffer * const buffer)
+static void nDPIsrvd_drain_buffer(struct nDPIsrvd_json_buffer * const json_buffer)
{
- memmove(buffer->ptr.raw, buffer->ptr.raw + buffer->json_string_length, buffer->used - buffer->json_string_length);
- buffer->used -= buffer->json_string_length;
- buffer->json_string_length = 0;
- buffer->json_string_start = 0;
+ memmove(json_buffer->buf.ptr.raw,
+ json_buffer->buf.ptr.raw + json_buffer->json_string_length,
+ json_buffer->buf.used - json_buffer->json_string_length);
+ json_buffer->buf.used -= json_buffer->json_string_length;
+ json_buffer->json_string_length = 0;
+ json_buffer->json_string_start = 0;
}
static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_all(struct nDPIsrvd_socket * const sock)
diff --git a/nDPId-test.c b/nDPId-test.c
index 90f02ae65..03d69c237 100644
--- a/nDPId-test.c
+++ b/nDPId-test.c
@@ -255,15 +255,15 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
error:
if (mock_test_desc != NULL)
{
- drain_cache_blocking(mock_test_desc);
+ drain_write_buffers_blocking(mock_test_desc);
}
if (mock_null_desc != NULL)
{
- drain_cache_blocking(mock_null_desc);
+ drain_write_buffers_blocking(mock_null_desc);
}
if (mock_arpa_desc != NULL)
{
- drain_cache_blocking(mock_arpa_desc);
+ drain_write_buffers_blocking(mock_arpa_desc);
}
del_event(epollfd, mock_pipefds[PIPE_nDPIsrvd]);
@@ -640,7 +640,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
"Problematic JSON string (start: %zu, length: %llu, buffer usage: %zu): %.*s",
mock_sock->buffer.json_string_start,
mock_sock->buffer.json_string_length,
- mock_sock->buffer.used,
+ mock_sock->buffer.buf.used,
(int)mock_sock->buffer.json_string_length,
mock_sock->buffer.json_string);
THREAD_ERROR_GOTO(trv);
@@ -992,13 +992,15 @@ int main(int argc, char ** argv)
unsigned long long int total_alloc_bytes =
#ifdef ENABLE_ZLIB
- (unsigned long long int)(ndpi_memory_alloc_bytes - zlib_compression_bytes - (zlib_compressions * sizeof(struct nDPId_detection_data)));
+ (unsigned long long int)(ndpi_memory_alloc_bytes - zlib_compression_bytes -
+ (zlib_compressions * sizeof(struct nDPId_detection_data)));
#else
(unsigned long long int)ndpi_memory_alloc_bytes;
#endif
unsigned long long int total_free_bytes =
#ifdef ENABLE_ZLIB
- (unsigned long long int)(ndpi_memory_free_bytes - zlib_compression_bytes - (zlib_compressions * sizeof(struct nDPId_detection_data)));
+ (unsigned long long int)(ndpi_memory_free_bytes - zlib_compression_bytes -
+ (zlib_compressions * sizeof(struct nDPId_detection_data)));
#else
(unsigned long long int)ndpi_memory_free_bytes;
#endif
@@ -1028,7 +1030,8 @@ int main(int argc, char ** argv)
total_free_bytes -
sizeof(struct nDPId_workflow) *
nDPId_options.reader_thread_count /* We do not want to take the workflow into account. */,
- total_alloc_count, total_free_count);
+ total_alloc_count,
+ total_free_count);
printf(
"~~ json string min len.......: %llu chars\n"
diff --git a/nDPIsrvd.c b/nDPIsrvd.c
index 05d3287e5..437ef5616 100644
--- a/nDPIsrvd.c
+++ b/nDPIsrvd.c
@@ -26,12 +26,17 @@ enum sock_type
DISTRIBUTOR_IN,
};
+struct nDPIsrvd_write_buffer
+{
+ struct nDPIsrvd_buffer buf;
+ size_t written;
+};
+
struct remote_desc
{
enum sock_type sock_type;
int fd;
- struct nDPIsrvd_buffer buf;
- UT_array * buf_cache;
+
union
{
struct
@@ -40,6 +45,8 @@ struct remote_desc
struct sockaddr_un peer;
unsigned long long int json_bytes;
pid_t pid;
+
+ struct nDPIsrvd_json_buffer main_read_buffer;
} event_collector_un;
struct
{
@@ -47,12 +54,18 @@ struct remote_desc
struct sockaddr_un peer;
pid_t pid;
char * user_name;
+
+ struct nDPIsrvd_write_buffer main_write_buffer;
+ UT_array * additional_write_buffers;
} event_distributor_un; /* UNIX socket */
struct
{
int distributor_sockfd;
struct sockaddr_in peer;
char peer_addr[INET_ADDRSTRLEN];
+
+ struct nDPIsrvd_write_buffer main_write_buffer;
+ UT_array * additional_write_buffers;
} event_distributor_in; /* TCP/IP socket */
};
};
@@ -81,11 +94,11 @@ static struct
nDPIsrvd_ull max_remote_descriptors;
char * user;
char * group;
- nDPIsrvd_ull cache_array_length;
- int cache_fallback_to_blocking;
+ nDPIsrvd_ull max_write_buffers;
+ int bufferbloat_fallback_to_blocking;
} nDPIsrvd_options = {.max_remote_descriptors = nDPIsrvd_MAX_REMOTE_DESCRIPTORS,
- .cache_array_length = nDPIsrvd_CACHE_ARRAY_LENGTH,
- .cache_fallback_to_blocking = 1};
+ .max_write_buffers = nDPIsrvd_MAX_WRITE_BUFFERS,
+ .bufferbloat_fallback_to_blocking = 1};
static void logger_nDPIsrvd(struct remote_desc const * const remote,
char const * const prefix,
@@ -97,34 +110,32 @@ static int add_in_event_fd(int epollfd, int fd);
static int add_in_event(int epollfd, struct remote_desc * const remote);
static int del_out_event(int epollfd, struct remote_desc * const remote);
static void disconnect_client(int epollfd, struct remote_desc * const current);
-static int drain_cache_blocking(struct remote_desc * const remote);
+static int drain_write_buffers_blocking(struct remote_desc * const remote);
static void nDPIsrvd_buffer_array_copy(void * dst, const void * src)
{
- struct nDPIsrvd_buffer * const buf_dst = (struct nDPIsrvd_buffer *)dst;
- struct nDPIsrvd_buffer const * const buf_src = (struct nDPIsrvd_buffer *)src;
+ struct nDPIsrvd_write_buffer * const buf_dst = (struct nDPIsrvd_write_buffer *)dst;
+ struct nDPIsrvd_write_buffer const * const buf_src = (struct nDPIsrvd_write_buffer *)src;
- buf_dst->ptr.raw = NULL;
- if (nDPIsrvd_buffer_init(buf_dst, buf_src->used) != 0)
+ buf_dst->buf.ptr.raw = NULL;
+ if (nDPIsrvd_buffer_init(&buf_dst->buf, buf_src->buf.used) != 0)
{
return;
}
- buf_dst->json_string_start = buf_src->json_string_start;
- buf_dst->json_string_length = buf_src->json_string_length;
- buf_dst->json_string = buf_src->json_string;
- buf_dst->used = buf_src->used;
- memcpy(buf_dst->ptr.raw, buf_src->ptr.raw, buf_src->used);
+ buf_dst->written = buf_src->written;
+ buf_dst->buf.used = buf_src->buf.used;
+ memcpy(buf_dst->buf.ptr.raw, buf_src->buf.ptr.raw, buf_src->buf.used);
}
static void nDPIsrvd_buffer_array_dtor(void * elt)
{
- struct nDPIsrvd_buffer * const buf = (struct nDPIsrvd_buffer *)elt;
+ struct nDPIsrvd_write_buffer * const buf_dst = (struct nDPIsrvd_write_buffer *)elt;
- nDPIsrvd_buffer_free(buf);
+ nDPIsrvd_buffer_free(&buf_dst->buf);
}
-static const UT_icd nDPIsrvd_buffer_array_icd = {sizeof(struct nDPIsrvd_buffer),
+static const UT_icd nDPIsrvd_buffer_array_icd = {sizeof(struct nDPIsrvd_write_buffer),
NULL,
nDPIsrvd_buffer_array_copy,
nDPIsrvd_buffer_array_dtor};
@@ -142,36 +153,93 @@ void nDPIsrvd_memprof_log(char const * const format, ...)
#endif
#endif
-static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf, nDPIsrvd_ull json_string_length)
+static struct nDPIsrvd_json_buffer * get_read_buffer(struct remote_desc * const remote)
+{
+ switch (remote->sock_type)
+ {
+ case COLLECTOR_UN:
+ return &remote->event_collector_un.main_read_buffer;
+
+ case DISTRIBUTOR_UN:
+ case DISTRIBUTOR_IN:
+ return NULL;
+ }
+
+ return NULL;
+}
+
+static struct nDPIsrvd_write_buffer * get_write_buffer(struct remote_desc * const remote)
+{
+ switch (remote->sock_type)
+ {
+ case COLLECTOR_UN:
+ return NULL;
+
+ case DISTRIBUTOR_UN:
+ return &remote->event_distributor_un.main_write_buffer;
+
+ case DISTRIBUTOR_IN:
+ return &remote->event_distributor_in.main_write_buffer;
+ }
+
+ return NULL;
+}
+
+static UT_array * get_additional_write_buffers(struct remote_desc * const remote)
{
- struct nDPIsrvd_buffer buf_src = {};
+ switch (remote->sock_type)
+ {
+ case COLLECTOR_UN:
+ return NULL;
- if (utarray_len(remote->buf_cache) >= nDPIsrvd_options.cache_array_length)
+ case DISTRIBUTOR_UN:
+ return remote->event_distributor_un.additional_write_buffers;
+
+ case DISTRIBUTOR_IN:
+ return remote->event_distributor_in.additional_write_buffers;
+ }
+
+ return NULL;
+}
+
+static int add_to_additional_write_buffers(struct remote_desc * const remote,
+ uint8_t * const buf,
+ nDPIsrvd_ull json_string_length)
+{
+ struct nDPIsrvd_write_buffer buf_src = {};
+ UT_array * const additional_write_buffers = get_additional_write_buffers(remote);
+
+ if (additional_write_buffers == NULL)
{
- if (nDPIsrvd_options.cache_fallback_to_blocking == 0)
+ return -1;
+ }
+
+ if (utarray_len(additional_write_buffers) >= nDPIsrvd_options.max_write_buffers)
+ {
+ if (nDPIsrvd_options.bufferbloat_fallback_to_blocking == 0)
{
logger_nDPIsrvd(remote,
- "Buffer cache limit for",
+ "Buffer limit for",
"for reached, remote too slow: %u lines",
- utarray_len(remote->buf_cache));
+ utarray_len(additional_write_buffers));
return -1;
}
else
{
logger_nDPIsrvd(remote,
- "Buffer JSON string cache limit for",
+ "Buffer limit for",
"reached, falling back to blocking I/O: %u lines",
- utarray_len(remote->buf_cache));
- if (drain_cache_blocking(remote) != 0)
+ utarray_len(additional_write_buffers));
+ if (drain_write_buffers_blocking(remote) != 0)
{
return -1;
}
}
}
- buf_src.ptr.raw = buf;
- buf_src.used = buf_src.max = buf_src.json_string_length = json_string_length;
- utarray_push_back(remote->buf_cache, &buf_src);
+ buf_src.buf.ptr.raw = buf;
+ buf_src.buf.used = buf_src.buf.max = json_string_length;
+ utarray_push_back(additional_write_buffers, &buf_src);
return 0;
}
@@ -216,13 +284,20 @@ static void logger_nDPIsrvd(struct remote_desc const * const remote,
static int drain_main_buffer(struct remote_desc * const remote)
{
- if (remote->buf.used == 0)
+ struct nDPIsrvd_write_buffer * const write_buffer = get_write_buffer(remote);
+
+ if (write_buffer == NULL)
+ {
+ return -1;
+ }
+
+ if (write_buffer->buf.used == 0)
{
return 0;
}
errno = 0;
- ssize_t bytes_written = write(remote->fd, remote->buf.ptr.raw, remote->buf.used);
+ ssize_t bytes_written = write(remote->fd, write_buffer->buf.ptr.raw, write_buffer->buf.used);
if (errno == EAGAIN)
{
return 0;
@@ -237,32 +312,37 @@ static int drain_main_buffer(struct remote_desc * const remote)
logger_nDPIsrvd(remote, "Distributor connection", "closed");
return -1;
}
- if ((size_t)bytes_written < remote->buf.used)
+ if ((size_t)bytes_written < write_buffer->buf.used)
{
#if 0
logger_nDPIsrvd(
remote, "Distributor", "wrote less than expected: %zd < %zu", bytes_written, remote->buf.used);
#endif
- memmove(remote->buf.ptr.raw, remote->buf.ptr.raw + bytes_written, remote->buf.used - bytes_written);
+ memmove(write_buffer->buf.ptr.raw,
+ write_buffer->buf.ptr.raw + bytes_written,
+ write_buffer->buf.used - bytes_written);
}
- remote->buf.used -= bytes_written;
+ write_buffer->buf.used -= bytes_written;
return 0;
}
-static int drain_cache(struct remote_desc * const remote)
+static int drain_write_buffers(struct remote_desc * const remote)
{
+ UT_array * const additional_write_buffers = get_additional_write_buffers(remote);
+
errno = 0;
- if (drain_main_buffer(remote) != 0)
+ if (drain_main_buffer(remote) != 0 || additional_write_buffers == NULL)
{
return -1;
}
- while (utarray_len(remote->buf_cache) > 0)
+ while (utarray_len(additional_write_buffers) > 0)
{
- struct nDPIsrvd_buffer * buf = (struct nDPIsrvd_buffer *)utarray_front(remote->buf_cache);
- ssize_t written = write(remote->fd, buf->ptr.raw + buf->json_string_start, buf->json_string_length);
+ struct nDPIsrvd_write_buffer * buf = (struct nDPIsrvd_write_buffer *)utarray_front(additional_write_buffers);
+ ssize_t written = write(remote->fd, buf->buf.ptr.raw + buf->written, buf->buf.used - buf->written);
+
switch (written)
{
case -1:
@@ -274,11 +354,10 @@ static int drain_cache(struct remote_desc * const remote)
case 0:
return -1;
default:
- buf->json_string_start += written;
- buf->json_string_length -= written;
- if (buf->json_string_length == 0)
+ buf->written += written;
+ if (buf->written == buf->buf.max)
{
- utarray_erase(remote->buf_cache, 0, 1);
+ utarray_erase(additional_write_buffers, 0, 1);
}
break;
}
@@ -287,7 +366,7 @@ static int drain_cache(struct remote_desc * const remote)
return 0;
}
-static int drain_cache_blocking(struct remote_desc * const remote)
+static int drain_write_buffers_blocking(struct remote_desc * const remote)
{
int retval = 0;
@@ -296,9 +375,9 @@ static int drain_cache_blocking(struct remote_desc * const remote)
logger_nDPIsrvd(remote, "Error setting distributor", "fd flags to blocking mode: %s", strerror(errno));
return -1;
}
- if (drain_cache(remote) != 0)
+ if (drain_write_buffers(remote) != 0)
{
- logger_nDPIsrvd(remote, "Could not drain buffer cache for", "in blocking I/O: %s", strerror(errno));
+ logger_nDPIsrvd(remote, "Could not drain buffers for", "in blocking I/O: %s", strerror(errno));
retval = -1;
}
if (fcntl_add_flags(remote->fd, O_NONBLOCK) != 0)
@@ -312,17 +391,19 @@ static int drain_cache_blocking(struct remote_desc * const remote)
static int handle_outgoing_data(int epollfd, struct remote_desc * const remote)
{
- if (remote->sock_type != DISTRIBUTOR_UN && remote->sock_type != DISTRIBUTOR_IN)
+ UT_array * const additional_write_buffers = get_additional_write_buffers(remote);
+
+ if (additional_write_buffers == NULL)
{
return -1;
}
- if (drain_cache(remote) != 0)
+ if (drain_write_buffers(remote) != 0)
{
- logger_nDPIsrvd(remote, "Could not drain buffer cache for", ": %s", strerror(errno));
+ logger_nDPIsrvd(remote, "Could not drain buffers for", ": %s", strerror(errno));
disconnect_client(epollfd, remote);
return -1;
}
- if (utarray_len(remote->buf_cache) == 0)
+ if (utarray_len(additional_write_buffers) == 0)
{
return del_out_event(epollfd, remote);
}
@@ -517,14 +598,42 @@ static struct remote_desc * get_remote_descriptor(enum sock_type type, int remot
if (remotes.desc[i].fd == -1)
{
remotes.desc_used++;
- if (remotes.desc[i].buf_cache == NULL)
+
+ struct nDPIsrvd_write_buffer * write_buffer = NULL;
+ UT_array ** additional_write_buffers = NULL;
+
+ switch (type)
{
- utarray_new(remotes.desc[i].buf_cache, &nDPIsrvd_buffer_array_icd);
+ case COLLECTOR_UN:
+ if (nDPIsrvd_json_buffer_init(&remotes.desc[i].event_collector_un.main_read_buffer,
+ max_buffer_size) != 0)
+ {
+ return NULL;
+ }
+ break;
+ case DISTRIBUTOR_UN:
+ write_buffer = &remotes.desc[i].event_distributor_un.main_write_buffer;
+ additional_write_buffers = &remotes.desc[i].event_distributor_un.additional_write_buffers;
+ break;
+ case DISTRIBUTOR_IN:
+ write_buffer = &remotes.desc[i].event_distributor_in.main_write_buffer;
+ additional_write_buffers = &remotes.desc[i].event_distributor_in.additional_write_buffers;
+ break;
}
- if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, max_buffer_size) != 0 || remotes.desc[i].buf_cache == NULL)
+
+ if (additional_write_buffers != NULL && *additional_write_buffers == NULL)
+ {
+ utarray_new(*additional_write_buffers, &nDPIsrvd_buffer_array_icd);
+ if (*additional_write_buffers == NULL)
+ {
+ return NULL;
+ }
+ }
+ if (write_buffer != NULL && nDPIsrvd_buffer_init(&write_buffer->buf, max_buffer_size) != 0)
{
return NULL;
}
+
remotes.desc[i].sock_type = type;
remotes.desc[i].fd = remote_fd;
return &remotes.desc[i];
@@ -538,12 +647,28 @@ static void free_remotes(void)
{
for (size_t i = 0; i < remotes.desc_size; ++i)
{
- if (remotes.desc[i].buf_cache != NULL)
+ switch (remotes.desc[i].sock_type)
{
- utarray_free(remotes.desc[i].buf_cache);
- remotes.desc[i].buf_cache = NULL;
+ case COLLECTOR_UN:
+ nDPIsrvd_json_buffer_free(&remotes.desc[i].event_collector_un.main_read_buffer);
+ break;
+ case DISTRIBUTOR_UN:
+ if (remotes.desc[i].event_distributor_un.additional_write_buffers != NULL)
+ {
+ utarray_free(remotes.desc[i].event_distributor_un.additional_write_buffers);
+ remotes.desc[i].event_distributor_un.additional_write_buffers = NULL;
+ }
+ nDPIsrvd_buffer_free(&remotes.desc[i].event_distributor_un.main_write_buffer.buf);
+ break;
+ case DISTRIBUTOR_IN:
+ if (remotes.desc[i].event_distributor_in.additional_write_buffers != NULL)
+ {
+ utarray_free(remotes.desc[i].event_distributor_in.additional_write_buffers);
+ remotes.desc[i].event_distributor_in.additional_write_buffers = NULL;
+ }
+ nDPIsrvd_buffer_free(&remotes.desc[i].event_distributor_in.main_write_buffer.buf);
+ break;
}
- nDPIsrvd_buffer_free(&remotes.desc[i].buf);
}
}
@@ -610,10 +735,25 @@ static void disconnect_client(int epollfd, struct remote_desc * const current)
{
case COLLECTOR_UN:
logger_nDPIsrvd(current, "Error closing collector connection", ": %s", strerror(errno));
+ nDPIsrvd_json_buffer_free(&current->event_collector_un.main_read_buffer);
break;
case DISTRIBUTOR_UN:
+ logger_nDPIsrvd(current, "Error closing distributor connection", ": %s", strerror(errno));
+ if (current->event_distributor_un.additional_write_buffers != NULL)
+ {
+ utarray_clear(current->event_distributor_un.additional_write_buffers);
+ }
+ nDPIsrvd_buffer_free(&current->event_distributor_un.main_write_buffer.buf);
+ current->event_distributor_un.main_write_buffer.written = 0;
+ break;
case DISTRIBUTOR_IN:
logger_nDPIsrvd(current, "Error closing distributor connection", ": %s", strerror(errno));
+ if (current->event_distributor_in.additional_write_buffers != NULL)
+ {
+ utarray_clear(current->event_distributor_in.additional_write_buffers);
+ }
+ nDPIsrvd_buffer_free(&current->event_distributor_in.main_write_buffer.buf);
+ current->event_distributor_in.main_write_buffer.written = 0;
break;
}
}
@@ -625,11 +765,6 @@ static void disconnect_client(int epollfd, struct remote_desc * const current)
current->fd = -1;
remotes.desc_used--;
}
- if (current->buf_cache != NULL)
- {
- utarray_clear(current->buf_cache);
- }
- nDPIsrvd_buffer_free(&current->buf);
}
static int nDPIsrvd_parse_options(int argc, char ** argv)
@@ -684,14 +819,14 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
nDPIsrvd_options.group = strdup(optarg);
break;
case 'C':
- if (str_value_to_ull(optarg, &nDPIsrvd_options.cache_array_length) != CONVERSION_OK)
+ if (str_value_to_ull(optarg, &nDPIsrvd_options.max_write_buffers) != CONVERSION_OK)
{
fprintf(stderr, "%s: Argument for `-C' is not a number: %s\n", argv[0], optarg);
return 1;
}
break;
case 'D':
- nDPIsrvd_options.cache_fallback_to_blocking = 0;
+ nDPIsrvd_options.bufferbloat_fallback_to_blocking = 0;
break;
case 'v':
fprintf(stderr, "%s", get_nDPId_version());
@@ -952,21 +1087,27 @@ static int new_connection(int epollfd, int eventfd)
static int handle_collector_protocol(int epollfd, struct remote_desc * const current)
{
+ struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current);
char * json_str_start = NULL;
- if (current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{')
+ if (json_read_buffer == NULL)
+ {
+ return 1;
+ }
+
+ if (json_read_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{')
{
logger_nDPIsrvd(current,
"BUG: Collector connection",
"JSON invalid opening character: '%c'",
- current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]);
+ json_read_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]);
disconnect_client(epollfd, current);
return 1;
}
errno = 0;
- current->event_collector_un.json_bytes = strtoull((char *)current->buf.ptr.text, &json_str_start, 10);
- current->event_collector_un.json_bytes += json_str_start - current->buf.ptr.text;
+ current->event_collector_un.json_bytes = strtoull(json_read_buffer->buf.ptr.text, &json_str_start, 10);
+ current->event_collector_un.json_bytes += json_str_start - json_read_buffer->buf.ptr.text;
if (errno == ERANGE)
{
@@ -975,18 +1116,18 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
return 1;
}
- if (json_str_start == current->buf.ptr.text)
+ if (json_str_start == json_read_buffer->buf.ptr.text)
{
logger_nDPIsrvd(current,
"BUG: Collector connection",
"missing JSON string length in protocol preamble: \"%.*s\"",
NETWORK_BUFFER_LENGTH_DIGITS,
- current->buf.ptr.text);
+ json_read_buffer->buf.ptr.text);
disconnect_client(epollfd, current);
return 1;
}
- if (json_str_start - current->buf.ptr.text != NETWORK_BUFFER_LENGTH_DIGITS)
+ if (json_str_start - json_read_buffer->buf.ptr.text != NETWORK_BUFFER_LENGTH_DIGITS)
{
logger_nDPIsrvd(current,
"BUG: Collector connection",
@@ -994,33 +1135,33 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
"%ld "
"bytes",
NETWORK_BUFFER_LENGTH_DIGITS,
- (long int)(json_str_start - current->buf.ptr.text));
+ (long int)(json_str_start - json_read_buffer->buf.ptr.text));
}
- if (current->event_collector_un.json_bytes > current->buf.max)
+ if (current->event_collector_un.json_bytes > json_read_buffer->buf.max)
{
logger_nDPIsrvd(current,
"BUG: Collector connection",
"JSON string too big: %llu > %zu",
current->event_collector_un.json_bytes,
- current->buf.max);
+ json_read_buffer->buf.max);
disconnect_client(epollfd, current);
return 1;
}
- if (current->event_collector_un.json_bytes > current->buf.used)
+ if (current->event_collector_un.json_bytes > json_read_buffer->buf.used)
{
return 1;
}
- if (current->buf.ptr.text[current->event_collector_un.json_bytes - 2] != '}' ||
- current->buf.ptr.text[current->event_collector_un.json_bytes - 1] != '\n')
+ if (json_read_buffer->buf.ptr.text[current->event_collector_un.json_bytes - 2] != '}' ||
+ json_read_buffer->buf.ptr.text[current->event_collector_un.json_bytes - 1] != '\n')
{
logger_nDPIsrvd(current,
"BUG: Collector connection",
"invalid JSON string: %.*s",
(int)current->event_collector_un.json_bytes,
- current->buf.ptr.text);
+ json_read_buffer->buf.ptr.text);
disconnect_client(epollfd, current);
return 1;
}
@@ -1030,7 +1171,9 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
static int handle_incoming_data(int epollfd, struct remote_desc * const current)
{
- if (current->sock_type != COLLECTOR_UN)
+ struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current);
+
+ if (json_read_buffer == NULL)
{
unsigned char garbage = 0;
@@ -1047,18 +1190,19 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
}
/* read JSON strings (or parts) from the UNIX socket (collecting) */
- if (current->buf.used == current->buf.max)
+ if (json_read_buffer->buf.used == json_read_buffer->buf.max)
{
logger_nDPIsrvd(current,
"Collector connection",
"read buffer (%zu bytes) full. No more read possible.",
- current->buf.max);
+ json_read_buffer->buf.max);
}
else
{
errno = 0;
- ssize_t bytes_read =
- read(current->fd, current->buf.ptr.raw + current->buf.used, current->buf.max - current->buf.used);
+ ssize_t bytes_read = read(current->fd,
+ json_read_buffer->buf.ptr.raw + json_read_buffer->buf.used,
+ json_read_buffer->buf.max - json_read_buffer->buf.used);
if (bytes_read < 0 || errno != 0)
{
logger_nDPIsrvd(current, "Could not read remote", ": %s", strerror(errno));
@@ -1071,10 +1215,10 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
disconnect_client(epollfd, current);
return 1;
}
- current->buf.used += bytes_read;
+ json_read_buffer->buf.used += bytes_read;
}
- while (current->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1)
+ while (json_read_buffer->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1)
{
if (handle_collector_protocol(epollfd, current) != 0)
{
@@ -1083,19 +1227,18 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
for (size_t i = 0; i < remotes.desc_size; ++i)
{
- if (remotes.desc[i].fd < 0)
- {
- continue;
- }
- if (remotes.desc[i].sock_type != DISTRIBUTOR_UN && remotes.desc[i].sock_type != DISTRIBUTOR_IN)
+ struct nDPIsrvd_write_buffer * const write_buffer = get_write_buffer(&remotes.desc[i]);
+ UT_array * const additional_write_buffers = get_additional_write_buffers(&remotes.desc[i]);
+
+ if (remotes.desc[i].fd < 0 || write_buffer == NULL || additional_write_buffers == NULL)
{
continue;
}
- if (current->event_collector_un.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used ||
- utarray_len(remotes.desc[i].buf_cache) > 0)
+ if (current->event_collector_un.json_bytes > write_buffer->buf.max - write_buffer->buf.used ||
+ utarray_len(additional_write_buffers) > 0)
{
- if (utarray_len(remotes.desc[i].buf_cache) == 0)
+ if (utarray_len(additional_write_buffers) == 0)
{
#if 0
logger_nDPIsrvd(&remotes.desc[i],
@@ -1114,7 +1257,9 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
continue;
}
}
- if (add_to_cache(&remotes.desc[i], current->buf.ptr.raw, current->event_collector_un.json_bytes) != 0)
+ if (add_to_additional_write_buffers(&remotes.desc[i],
+ json_read_buffer->buf.ptr.raw,
+ current->event_collector_un.json_bytes) != 0)
{
disconnect_client(epollfd, &remotes.desc[i]);
continue;
@@ -1122,10 +1267,10 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
}
else
{
- memcpy(remotes.desc[i].buf.ptr.raw + remotes.desc[i].buf.used,
- current->buf.ptr.raw,
+ memcpy(write_buffer->buf.ptr.raw + write_buffer->buf.used,
+ json_read_buffer->buf.ptr.raw,
current->event_collector_un.json_bytes);
- remotes.desc[i].buf.used += current->event_collector_un.json_bytes;
+ write_buffer->buf.used += current->event_collector_un.json_bytes;
}
if (drain_main_buffer(&remotes.desc[i]) != 0)
@@ -1134,10 +1279,10 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
}
}
- memmove(current->buf.ptr.raw,
- current->buf.ptr.raw + current->event_collector_un.json_bytes,
- current->buf.used - current->event_collector_un.json_bytes);
- current->buf.used -= current->event_collector_un.json_bytes;
+ memmove(json_read_buffer->buf.ptr.raw,
+ json_read_buffer->buf.ptr.raw + current->event_collector_un.json_bytes,
+ json_read_buffer->buf.used - current->event_collector_un.json_bytes);
+ json_read_buffer->buf.used -= current->event_collector_un.json_bytes;
current->event_collector_un.json_bytes = 0;
}
@@ -1230,7 +1375,7 @@ static int mainloop(int epollfd)
switch (current->sock_type)
{
case COLLECTOR_UN:
- logger_nDPIsrvd(current, "Collector disconnected", "closed");
+ logger_nDPIsrvd(current, "Collector connection", "closed");
break;
case DISTRIBUTOR_UN:
case DISTRIBUTOR_IN: