diff options
author | lns <matzeton@googlemail.com> | 2022-04-16 23:21:24 +0200 |
---|---|---|
committer | lns <matzeton@googlemail.com> | 2022-04-16 23:21:24 +0200 |
commit | c283b89afda98fdcee81fe1b9634f10af1077878 (patch) | |
tree | 725a3496177903258069a2d11536c348722d79b8 | |
parent | db83f82d29df4fd0dfe638fad305366fb265edb8 (diff) |
Refactored buffer subsystem.
Signed-off-by: lns <matzeton@googlemail.com>
-rw-r--r-- | config.h | 2 | ||||
-rw-r--r-- | dependencies/nDPIsrvd.h | 79 | ||||
-rw-r--r-- | nDPId-test.c | 17 | ||||
-rw-r--r-- | nDPIsrvd.c | 353 |
4 files changed, 311 insertions, 140 deletions
@@ -39,6 +39,6 @@ /* nDPIsrvd default config options */ #define nDPIsrvd_PIDFILE "/tmp/ndpisrvd.pid" #define nDPIsrvd_MAX_REMOTE_DESCRIPTORS 32 -#define nDPIsrvd_CACHE_ARRAY_LENGTH 256 +#define nDPIsrvd_MAX_WRITE_BUFFERS 1024 #endif diff --git a/dependencies/nDPIsrvd.h b/dependencies/nDPIsrvd.h index 44bd4703c..05f41335c 100644 --- a/dependencies/nDPIsrvd.h +++ b/dependencies/nDPIsrvd.h @@ -196,6 +196,11 @@ struct nDPIsrvd_buffer } ptr; size_t used; size_t max; +}; + +struct nDPIsrvd_json_buffer +{ + struct nDPIsrvd_buffer buf; char * json_string; size_t json_string_start; nDPIsrvd_ull json_string_length; @@ -221,7 +226,7 @@ struct nDPIsrvd_socket instance_cleanup_callback instance_cleanup_callback; flow_cleanup_callback flow_cleanup_callback; - struct nDPIsrvd_buffer buffer; + struct nDPIsrvd_json_buffer buffer; struct nDPIsrvd_jsmn jsmn; /* easy and fast JSON key/value access via hash table and a static array */ @@ -375,9 +380,6 @@ static inline int nDPIsrvd_buffer_init(struct nDPIsrvd_buffer * const buffer, si return 1; } - buffer->json_string_start = 0; - buffer->json_string_length = 0ull; - buffer->json_string = NULL; buffer->used = 0; buffer->max = buffer_size; @@ -390,6 +392,24 @@ static inline void nDPIsrvd_buffer_free(struct nDPIsrvd_buffer * const buffer) buffer->ptr.raw = NULL; } +static inline int nDPIsrvd_json_buffer_init(struct nDPIsrvd_json_buffer * const json_buffer, size_t json_buffer_size) +{ + int ret = nDPIsrvd_buffer_init(&json_buffer->buf, json_buffer_size); + if (ret == 0) + { + json_buffer->json_string_start = 0ul; + json_buffer->json_string_length = 0ull; + json_buffer->json_string = NULL; + } + + return ret; +} + +static inline void nDPIsrvd_json_buffer_free(struct nDPIsrvd_json_buffer * const json_buffer) +{ + nDPIsrvd_buffer_free(&json_buffer->buf); +} + static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_data_size, size_t instance_user_data_size, size_t thread_user_data_size, @@ -409,7 +429,7 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_d if (sock != NULL) { sock->fd = -1; - if (nDPIsrvd_buffer_init(&sock->buffer, NETWORK_BUFFER_MAX_SIZE) != 0) + if (nDPIsrvd_json_buffer_init(&sock->buffer, NETWORK_BUFFER_MAX_SIZE) != 0) { goto error; } @@ -435,7 +455,7 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_d return sock; error: - nDPIsrvd_buffer_free(&sock->buffer); + nDPIsrvd_json_buffer_free(&sock->buffer); nDPIsrvd_socket_free(&sock); return NULL; } @@ -545,7 +565,7 @@ static inline void nDPIsrvd_socket_free(struct nDPIsrvd_socket ** const sock) } (*sock)->instance_table = NULL; - nDPIsrvd_buffer_free(&(*sock)->buffer); + nDPIsrvd_json_buffer_free(&(*sock)->buffer); nDPIsrvd_free(*sock); *sock = NULL; @@ -641,12 +661,13 @@ static inline enum nDPIsrvd_connect_return nDPIsrvd_connect(struct nDPIsrvd_sock static inline enum nDPIsrvd_read_return nDPIsrvd_read(struct nDPIsrvd_socket * const sock) { - if (sock->buffer.used == sock->buffer.max) + if (sock->buffer.buf.used == sock->buffer.buf.max) { return READ_OK; } - ssize_t bytes_read = read(sock->fd, sock->buffer.ptr.raw + sock->buffer.used, sock->buffer.max - sock->buffer.used); + ssize_t bytes_read = + read(sock->fd, sock->buffer.buf.ptr.raw + sock->buffer.buf.used, sock->buffer.buf.max - sock->buffer.buf.used); if (bytes_read == 0) { @@ -657,7 +678,7 @@ static inline enum nDPIsrvd_read_return nDPIsrvd_read(struct nDPIsrvd_socket * c return READ_ERROR; } - sock->buffer.used += bytes_read; + sock->buffer.buf.used += bytes_read; return READ_OK; } @@ -1080,49 +1101,49 @@ static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock, return 0; } -static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_line(struct nDPIsrvd_buffer * const buffer, +static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_line(struct nDPIsrvd_json_buffer * const json_buffer, struct nDPIsrvd_jsmn * const jsmn) { - if (buffer->used < NETWORK_BUFFER_LENGTH_DIGITS + 1) + if (json_buffer->buf.used < NETWORK_BUFFER_LENGTH_DIGITS + 1) { return PARSE_NEED_MORE_DATA; } - if (buffer->ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{') + if (json_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{') { return PARSE_INVALID_OPENING_CHAR; } errno = 0; - buffer->json_string_length = strtoull((const char *)buffer->ptr.text, &buffer->json_string, 10); - buffer->json_string_length += buffer->json_string - buffer->ptr.text; - buffer->json_string_start = buffer->json_string - buffer->ptr.text; + json_buffer->json_string_length = strtoull((const char *)json_buffer->buf.ptr.text, &json_buffer->json_string, 10); + json_buffer->json_string_length += json_buffer->json_string - json_buffer->buf.ptr.text; + json_buffer->json_string_start = json_buffer->json_string - json_buffer->buf.ptr.text; if (errno == ERANGE) { return PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT; } - if (buffer->json_string == buffer->ptr.text) + if (json_buffer->json_string == json_buffer->buf.ptr.text) { return PARSE_SIZE_MISSING; } - if (buffer->json_string_length > buffer->max) + if (json_buffer->json_string_length > json_buffer->buf.max) { return PARSE_STRING_TOO_BIG; } - if (buffer->json_string_length > buffer->used) + if (json_buffer->json_string_length > json_buffer->buf.used) { return PARSE_NEED_MORE_DATA; } - if (buffer->ptr.text[buffer->json_string_length - 2] != '}' || - buffer->ptr.text[buffer->json_string_length - 1] != '\n') + if (json_buffer->buf.ptr.text[json_buffer->json_string_length - 2] != '}' || + json_buffer->buf.ptr.text[json_buffer->json_string_length - 1] != '\n') { return PARSE_INVALID_CLOSING_CHAR; } jsmn_init(&jsmn->parser); jsmn->tokens_found = jsmn_parse(&jsmn->parser, - buffer->ptr.text + buffer->json_string_start, - buffer->json_string_length - buffer->json_string_start, + json_buffer->buf.ptr.text + json_buffer->json_string_start, + json_buffer->json_string_length - json_buffer->json_string_start, jsmn->tokens, nDPIsrvd_MAX_JSON_TOKENS); if (jsmn->tokens_found < 0 || jsmn->tokens[0].type != JSMN_OBJECT) @@ -1143,12 +1164,14 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_line(struct nDPIsrvd_buf return PARSE_OK; } -static void nDPIsrvd_drain_buffer(struct nDPIsrvd_buffer * const buffer) +static void nDPIsrvd_drain_buffer(struct nDPIsrvd_json_buffer * const json_buffer) { - memmove(buffer->ptr.raw, buffer->ptr.raw + buffer->json_string_length, buffer->used - buffer->json_string_length); - buffer->used -= buffer->json_string_length; - buffer->json_string_length = 0; - buffer->json_string_start = 0; + memmove(json_buffer->buf.ptr.raw, + json_buffer->buf.ptr.raw + json_buffer->json_string_length, + json_buffer->buf.used - json_buffer->json_string_length); + json_buffer->buf.used -= json_buffer->json_string_length; + json_buffer->json_string_length = 0; + json_buffer->json_string_start = 0; } static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_all(struct nDPIsrvd_socket * const sock) diff --git a/nDPId-test.c b/nDPId-test.c index 90f02ae65..03d69c237 100644 --- a/nDPId-test.c +++ b/nDPId-test.c @@ -255,15 +255,15 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) error: if (mock_test_desc != NULL) { - drain_cache_blocking(mock_test_desc); + drain_write_buffers_blocking(mock_test_desc); } if (mock_null_desc != NULL) { - drain_cache_blocking(mock_null_desc); + drain_write_buffers_blocking(mock_null_desc); } if (mock_arpa_desc != NULL) { - drain_cache_blocking(mock_arpa_desc); + drain_write_buffers_blocking(mock_arpa_desc); } del_event(epollfd, mock_pipefds[PIPE_nDPIsrvd]); @@ -640,7 +640,7 @@ static void * distributor_client_mainloop_thread(void * const arg) "Problematic JSON string (start: %zu, length: %llu, buffer usage: %zu): %.*s", mock_sock->buffer.json_string_start, mock_sock->buffer.json_string_length, - mock_sock->buffer.used, + mock_sock->buffer.buf.used, (int)mock_sock->buffer.json_string_length, mock_sock->buffer.json_string); THREAD_ERROR_GOTO(trv); @@ -992,13 +992,15 @@ int main(int argc, char ** argv) unsigned long long int total_alloc_bytes = #ifdef ENABLE_ZLIB - (unsigned long long int)(ndpi_memory_alloc_bytes - zlib_compression_bytes - (zlib_compressions * sizeof(struct nDPId_detection_data))); + (unsigned long long int)(ndpi_memory_alloc_bytes - zlib_compression_bytes - + (zlib_compressions * sizeof(struct nDPId_detection_data))); #else (unsigned long long int)ndpi_memory_alloc_bytes; #endif unsigned long long int total_free_bytes = #ifdef ENABLE_ZLIB - (unsigned long long int)(ndpi_memory_free_bytes - zlib_compression_bytes - (zlib_compressions * sizeof(struct nDPId_detection_data))); + (unsigned long long int)(ndpi_memory_free_bytes - zlib_compression_bytes - + (zlib_compressions * sizeof(struct nDPId_detection_data))); #else (unsigned long long int)ndpi_memory_free_bytes; #endif @@ -1028,7 +1030,8 @@ int main(int argc, char ** argv) total_free_bytes - sizeof(struct nDPId_workflow) * nDPId_options.reader_thread_count /* We do not want to take the workflow into account. */, - total_alloc_count, total_free_count); + total_alloc_count, + total_free_count); printf( "~~ json string min len.......: %llu chars\n" diff --git a/nDPIsrvd.c b/nDPIsrvd.c index 05d3287e5..437ef5616 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -26,12 +26,17 @@ enum sock_type DISTRIBUTOR_IN, }; +struct nDPIsrvd_write_buffer +{ + struct nDPIsrvd_buffer buf; + size_t written; +}; + struct remote_desc { enum sock_type sock_type; int fd; - struct nDPIsrvd_buffer buf; - UT_array * buf_cache; + union { struct @@ -40,6 +45,8 @@ struct remote_desc struct sockaddr_un peer; unsigned long long int json_bytes; pid_t pid; + + struct nDPIsrvd_json_buffer main_read_buffer; } event_collector_un; struct { @@ -47,12 +54,18 @@ struct remote_desc struct sockaddr_un peer; pid_t pid; char * user_name; + + struct nDPIsrvd_write_buffer main_write_buffer; + UT_array * additional_write_buffers; } event_distributor_un; /* UNIX socket */ struct { int distributor_sockfd; struct sockaddr_in peer; char peer_addr[INET_ADDRSTRLEN]; + + struct nDPIsrvd_write_buffer main_write_buffer; + UT_array * additional_write_buffers; } event_distributor_in; /* TCP/IP socket */ }; }; @@ -81,11 +94,11 @@ static struct nDPIsrvd_ull max_remote_descriptors; char * user; char * group; - nDPIsrvd_ull cache_array_length; - int cache_fallback_to_blocking; + nDPIsrvd_ull max_write_buffers; + int bufferbloat_fallback_to_blocking; } nDPIsrvd_options = {.max_remote_descriptors = nDPIsrvd_MAX_REMOTE_DESCRIPTORS, - .cache_array_length = nDPIsrvd_CACHE_ARRAY_LENGTH, - .cache_fallback_to_blocking = 1}; + .max_write_buffers = nDPIsrvd_MAX_WRITE_BUFFERS, + .bufferbloat_fallback_to_blocking = 1}; static void logger_nDPIsrvd(struct remote_desc const * const remote, char const * const prefix, @@ -97,34 +110,32 @@ static int add_in_event_fd(int epollfd, int fd); static int add_in_event(int epollfd, struct remote_desc * const remote); static int del_out_event(int epollfd, struct remote_desc * const remote); static void disconnect_client(int epollfd, struct remote_desc * const current); -static int drain_cache_blocking(struct remote_desc * const remote); +static int drain_write_buffers_blocking(struct remote_desc * const remote); static void nDPIsrvd_buffer_array_copy(void * dst, const void * src) { - struct nDPIsrvd_buffer * const buf_dst = (struct nDPIsrvd_buffer *)dst; - struct nDPIsrvd_buffer const * const buf_src = (struct nDPIsrvd_buffer *)src; + struct nDPIsrvd_write_buffer * const buf_dst = (struct nDPIsrvd_write_buffer *)dst; + struct nDPIsrvd_write_buffer const * const buf_src = (struct nDPIsrvd_write_buffer *)src; - buf_dst->ptr.raw = NULL; - if (nDPIsrvd_buffer_init(buf_dst, buf_src->used) != 0) + buf_dst->buf.ptr.raw = NULL; + if (nDPIsrvd_buffer_init(&buf_dst->buf, buf_src->buf.used) != 0) { return; } - buf_dst->json_string_start = buf_src->json_string_start; - buf_dst->json_string_length = buf_src->json_string_length; - buf_dst->json_string = buf_src->json_string; - buf_dst->used = buf_src->used; - memcpy(buf_dst->ptr.raw, buf_src->ptr.raw, buf_src->used); + buf_dst->written = buf_src->written; + buf_dst->buf.used = buf_src->buf.used; + memcpy(buf_dst->buf.ptr.raw, buf_src->buf.ptr.raw, buf_src->buf.used); } static void nDPIsrvd_buffer_array_dtor(void * elt) { - struct nDPIsrvd_buffer * const buf = (struct nDPIsrvd_buffer *)elt; + struct nDPIsrvd_write_buffer * const buf_dst = (struct nDPIsrvd_write_buffer *)elt; - nDPIsrvd_buffer_free(buf); + nDPIsrvd_buffer_free(&buf_dst->buf); } -static const UT_icd nDPIsrvd_buffer_array_icd = {sizeof(struct nDPIsrvd_buffer), +static const UT_icd nDPIsrvd_buffer_array_icd = {sizeof(struct nDPIsrvd_write_buffer), NULL, nDPIsrvd_buffer_array_copy, nDPIsrvd_buffer_array_dtor}; @@ -142,36 +153,93 @@ void nDPIsrvd_memprof_log(char const * const format, ...) #endif #endif -static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf, nDPIsrvd_ull json_string_length) +static struct nDPIsrvd_json_buffer * get_read_buffer(struct remote_desc * const remote) +{ + switch (remote->sock_type) + { + case COLLECTOR_UN: + return &remote->event_collector_un.main_read_buffer; + + case DISTRIBUTOR_UN: + case DISTRIBUTOR_IN: + return NULL; + } + + return NULL; +} + +static struct nDPIsrvd_write_buffer * get_write_buffer(struct remote_desc * const remote) +{ + switch (remote->sock_type) + { + case COLLECTOR_UN: + return NULL; + + case DISTRIBUTOR_UN: + return &remote->event_distributor_un.main_write_buffer; + + case DISTRIBUTOR_IN: + return &remote->event_distributor_in.main_write_buffer; + } + + return NULL; +} + +static UT_array * get_additional_write_buffers(struct remote_desc * const remote) { - struct nDPIsrvd_buffer buf_src = {}; + switch (remote->sock_type) + { + case COLLECTOR_UN: + return NULL; - if (utarray_len(remote->buf_cache) >= nDPIsrvd_options.cache_array_length) + case DISTRIBUTOR_UN: + return remote->event_distributor_un.additional_write_buffers; + + case DISTRIBUTOR_IN: + return remote->event_distributor_in.additional_write_buffers; + } + + return NULL; +} + +static int add_to_additional_write_buffers(struct remote_desc * const remote, + uint8_t * const buf, + nDPIsrvd_ull json_string_length) +{ + struct nDPIsrvd_write_buffer buf_src = {}; + UT_array * const additional_write_buffers = get_additional_write_buffers(remote); + + if (additional_write_buffers == NULL) { - if (nDPIsrvd_options.cache_fallback_to_blocking == 0) + return -1; + } + + if (utarray_len(additional_write_buffers) >= nDPIsrvd_options.max_write_buffers) + { + if (nDPIsrvd_options.bufferbloat_fallback_to_blocking == 0) { logger_nDPIsrvd(remote, - "Buffer cache limit for", + "Buffer limit for", "for reached, remote too slow: %u lines", - utarray_len(remote->buf_cache)); + utarray_len(additional_write_buffers)); return -1; } else { logger_nDPIsrvd(remote, - "Buffer JSON string cache limit for", + "Buffer limit for", "reached, falling back to blocking I/O: %u lines", - utarray_len(remote->buf_cache)); - if (drain_cache_blocking(remote) != 0) + utarray_len(additional_write_buffers)); + if (drain_write_buffers_blocking(remote) != 0) { return -1; } } } - buf_src.ptr.raw = buf; - buf_src.used = buf_src.max = buf_src.json_string_length = json_string_length; - utarray_push_back(remote->buf_cache, &buf_src); + buf_src.buf.ptr.raw = buf; + buf_src.buf.used = buf_src.buf.max = json_string_length; + utarray_push_back(additional_write_buffers, &buf_src); return 0; } @@ -216,13 +284,20 @@ static void logger_nDPIsrvd(struct remote_desc const * const remote, static int drain_main_buffer(struct remote_desc * const remote) { - if (remote->buf.used == 0) + struct nDPIsrvd_write_buffer * const write_buffer = get_write_buffer(remote); + + if (write_buffer == NULL) + { + return -1; + } + + if (write_buffer->buf.used == 0) { return 0; } errno = 0; - ssize_t bytes_written = write(remote->fd, remote->buf.ptr.raw, remote->buf.used); + ssize_t bytes_written = write(remote->fd, write_buffer->buf.ptr.raw, write_buffer->buf.used); if (errno == EAGAIN) { return 0; @@ -237,32 +312,37 @@ static int drain_main_buffer(struct remote_desc * const remote) logger_nDPIsrvd(remote, "Distributor connection", "closed"); return -1; } - if ((size_t)bytes_written < remote->buf.used) + if ((size_t)bytes_written < write_buffer->buf.used) { #if 0 logger_nDPIsrvd( remote, "Distributor", "wrote less than expected: %zd < %zu", bytes_written, remote->buf.used); #endif - memmove(remote->buf.ptr.raw, remote->buf.ptr.raw + bytes_written, remote->buf.used - bytes_written); + memmove(write_buffer->buf.ptr.raw, + write_buffer->buf.ptr.raw + bytes_written, + write_buffer->buf.used - bytes_written); } - remote->buf.used -= bytes_written; + write_buffer->buf.used -= bytes_written; return 0; } -static int drain_cache(struct remote_desc * const remote) +static int drain_write_buffers(struct remote_desc * const remote) { + UT_array * const additional_write_buffers = get_additional_write_buffers(remote); + errno = 0; - if (drain_main_buffer(remote) != 0) + if (drain_main_buffer(remote) != 0 || additional_write_buffers == NULL) { return -1; } - while (utarray_len(remote->buf_cache) > 0) + while (utarray_len(additional_write_buffers) > 0) { - struct nDPIsrvd_buffer * buf = (struct nDPIsrvd_buffer *)utarray_front(remote->buf_cache); - ssize_t written = write(remote->fd, buf->ptr.raw + buf->json_string_start, buf->json_string_length); + struct nDPIsrvd_write_buffer * buf = (struct nDPIsrvd_write_buffer *)utarray_front(additional_write_buffers); + ssize_t written = write(remote->fd, buf->buf.ptr.raw + buf->written, buf->buf.used - buf->written); + switch (written) { case -1: @@ -274,11 +354,10 @@ static int drain_cache(struct remote_desc * const remote) case 0: return -1; default: - buf->json_string_start += written; - buf->json_string_length -= written; - if (buf->json_string_length == 0) + buf->written += written; + if (buf->written == buf->buf.max) { - utarray_erase(remote->buf_cache, 0, 1); + utarray_erase(additional_write_buffers, 0, 1); } break; } @@ -287,7 +366,7 @@ static int drain_cache(struct remote_desc * const remote) return 0; } -static int drain_cache_blocking(struct remote_desc * const remote) +static int drain_write_buffers_blocking(struct remote_desc * const remote) { int retval = 0; @@ -296,9 +375,9 @@ static int drain_cache_blocking(struct remote_desc * const remote) logger_nDPIsrvd(remote, "Error setting distributor", "fd flags to blocking mode: %s", strerror(errno)); return -1; } - if (drain_cache(remote) != 0) + if (drain_write_buffers(remote) != 0) { - logger_nDPIsrvd(remote, "Could not drain buffer cache for", "in blocking I/O: %s", strerror(errno)); + logger_nDPIsrvd(remote, "Could not drain buffers for", "in blocking I/O: %s", strerror(errno)); retval = -1; } if (fcntl_add_flags(remote->fd, O_NONBLOCK) != 0) @@ -312,17 +391,19 @@ static int drain_cache_blocking(struct remote_desc * const remote) static int handle_outgoing_data(int epollfd, struct remote_desc * const remote) { - if (remote->sock_type != DISTRIBUTOR_UN && remote->sock_type != DISTRIBUTOR_IN) + UT_array * const additional_write_buffers = get_additional_write_buffers(remote); + + if (additional_write_buffers == NULL) { return -1; } - if (drain_cache(remote) != 0) + if (drain_write_buffers(remote) != 0) { - logger_nDPIsrvd(remote, "Could not drain buffer cache for", ": %s", strerror(errno)); + logger_nDPIsrvd(remote, "Could not drain buffers for", ": %s", strerror(errno)); disconnect_client(epollfd, remote); return -1; } - if (utarray_len(remote->buf_cache) == 0) + if (utarray_len(additional_write_buffers) == 0) { return del_out_event(epollfd, remote); } @@ -517,14 +598,42 @@ static struct remote_desc * get_remote_descriptor(enum sock_type type, int remot if (remotes.desc[i].fd == -1) { remotes.desc_used++; - if (remotes.desc[i].buf_cache == NULL) + + struct nDPIsrvd_write_buffer * write_buffer = NULL; + UT_array ** additional_write_buffers = NULL; + + switch (type) { - utarray_new(remotes.desc[i].buf_cache, &nDPIsrvd_buffer_array_icd); + case COLLECTOR_UN: + if (nDPIsrvd_json_buffer_init(&remotes.desc[i].event_collector_un.main_read_buffer, + max_buffer_size) != 0) + { + return NULL; + } + break; + case DISTRIBUTOR_UN: + write_buffer = &remotes.desc[i].event_distributor_un.main_write_buffer; + additional_write_buffers = &remotes.desc[i].event_distributor_un.additional_write_buffers; + break; + case DISTRIBUTOR_IN: + write_buffer = &remotes.desc[i].event_distributor_in.main_write_buffer; + additional_write_buffers = &remotes.desc[i].event_distributor_in.additional_write_buffers; + break; } - if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, max_buffer_size) != 0 || remotes.desc[i].buf_cache == NULL) + + if (additional_write_buffers != NULL && *additional_write_buffers == NULL) + { + utarray_new(*additional_write_buffers, &nDPIsrvd_buffer_array_icd); + if (*additional_write_buffers == NULL) + { + return NULL; + } + } + if (write_buffer != NULL && nDPIsrvd_buffer_init(&write_buffer->buf, max_buffer_size) != 0) { return NULL; } + remotes.desc[i].sock_type = type; remotes.desc[i].fd = remote_fd; return &remotes.desc[i]; @@ -538,12 +647,28 @@ static void free_remotes(void) { for (size_t i = 0; i < remotes.desc_size; ++i) { - if (remotes.desc[i].buf_cache != NULL) + switch (remotes.desc[i].sock_type) { - utarray_free(remotes.desc[i].buf_cache); - remotes.desc[i].buf_cache = NULL; + case COLLECTOR_UN: + nDPIsrvd_json_buffer_free(&remotes.desc[i].event_collector_un.main_read_buffer); + break; + case DISTRIBUTOR_UN: + if (remotes.desc[i].event_distributor_un.additional_write_buffers != NULL) + { + utarray_free(remotes.desc[i].event_distributor_un.additional_write_buffers); + remotes.desc[i].event_distributor_un.additional_write_buffers = NULL; + } + nDPIsrvd_buffer_free(&remotes.desc[i].event_distributor_un.main_write_buffer.buf); + break; + case DISTRIBUTOR_IN: + if (remotes.desc[i].event_distributor_in.additional_write_buffers != NULL) + { + utarray_free(remotes.desc[i].event_distributor_in.additional_write_buffers); + remotes.desc[i].event_distributor_in.additional_write_buffers = NULL; + } + nDPIsrvd_buffer_free(&remotes.desc[i].event_distributor_in.main_write_buffer.buf); + break; } - nDPIsrvd_buffer_free(&remotes.desc[i].buf); } } @@ -610,10 +735,25 @@ static void disconnect_client(int epollfd, struct remote_desc * const current) { case COLLECTOR_UN: logger_nDPIsrvd(current, "Error closing collector connection", ": %s", strerror(errno)); + nDPIsrvd_json_buffer_free(¤t->event_collector_un.main_read_buffer); break; case DISTRIBUTOR_UN: + logger_nDPIsrvd(current, "Error closing distributor connection", ": %s", strerror(errno)); + if (current->event_distributor_un.additional_write_buffers != NULL) + { + utarray_clear(current->event_distributor_un.additional_write_buffers); + } + nDPIsrvd_buffer_free(¤t->event_distributor_un.main_write_buffer.buf); + current->event_distributor_un.main_write_buffer.written = 0; + break; case DISTRIBUTOR_IN: logger_nDPIsrvd(current, "Error closing distributor connection", ": %s", strerror(errno)); + if (current->event_distributor_in.additional_write_buffers != NULL) + { + utarray_clear(current->event_distributor_in.additional_write_buffers); + } + nDPIsrvd_buffer_free(¤t->event_distributor_in.main_write_buffer.buf); + current->event_distributor_in.main_write_buffer.written = 0; break; } } @@ -625,11 +765,6 @@ static void disconnect_client(int epollfd, struct remote_desc * const current) current->fd = -1; remotes.desc_used--; } - if (current->buf_cache != NULL) - { - utarray_clear(current->buf_cache); - } - nDPIsrvd_buffer_free(¤t->buf); } static int nDPIsrvd_parse_options(int argc, char ** argv) @@ -684,14 +819,14 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) nDPIsrvd_options.group = strdup(optarg); break; case 'C': - if (str_value_to_ull(optarg, &nDPIsrvd_options.cache_array_length) != CONVERSION_OK) + if (str_value_to_ull(optarg, &nDPIsrvd_options.max_write_buffers) != CONVERSION_OK) { fprintf(stderr, "%s: Argument for `-C' is not a number: %s\n", argv[0], optarg); return 1; } break; case 'D': - nDPIsrvd_options.cache_fallback_to_blocking = 0; + nDPIsrvd_options.bufferbloat_fallback_to_blocking = 0; break; case 'v': fprintf(stderr, "%s", get_nDPId_version()); @@ -952,21 +1087,27 @@ static int new_connection(int epollfd, int eventfd) static int handle_collector_protocol(int epollfd, struct remote_desc * const current) { + struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current); char * json_str_start = NULL; - if (current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{') + if (json_read_buffer == NULL) + { + return 1; + } + + if (json_read_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{') { logger_nDPIsrvd(current, "BUG: Collector connection", "JSON invalid opening character: '%c'", - current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]); + json_read_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]); disconnect_client(epollfd, current); return 1; } errno = 0; - current->event_collector_un.json_bytes = strtoull((char *)current->buf.ptr.text, &json_str_start, 10); - current->event_collector_un.json_bytes += json_str_start - current->buf.ptr.text; + current->event_collector_un.json_bytes = strtoull(json_read_buffer->buf.ptr.text, &json_str_start, 10); + current->event_collector_un.json_bytes += json_str_start - json_read_buffer->buf.ptr.text; if (errno == ERANGE) { @@ -975,18 +1116,18 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur return 1; } - if (json_str_start == current->buf.ptr.text) + if (json_str_start == json_read_buffer->buf.ptr.text) { logger_nDPIsrvd(current, "BUG: Collector connection", "missing JSON string length in protocol preamble: \"%.*s\"", NETWORK_BUFFER_LENGTH_DIGITS, - current->buf.ptr.text); + json_read_buffer->buf.ptr.text); disconnect_client(epollfd, current); return 1; } - if (json_str_start - current->buf.ptr.text != NETWORK_BUFFER_LENGTH_DIGITS) + if (json_str_start - json_read_buffer->buf.ptr.text != NETWORK_BUFFER_LENGTH_DIGITS) { logger_nDPIsrvd(current, "BUG: Collector connection", @@ -994,33 +1135,33 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur "%ld " "bytes", NETWORK_BUFFER_LENGTH_DIGITS, - (long int)(json_str_start - current->buf.ptr.text)); + (long int)(json_str_start - json_read_buffer->buf.ptr.text)); } - if (current->event_collector_un.json_bytes > current->buf.max) + if (current->event_collector_un.json_bytes > json_read_buffer->buf.max) { logger_nDPIsrvd(current, "BUG: Collector connection", "JSON string too big: %llu > %zu", current->event_collector_un.json_bytes, - current->buf.max); + json_read_buffer->buf.max); disconnect_client(epollfd, current); return 1; } - if (current->event_collector_un.json_bytes > current->buf.used) + if (current->event_collector_un.json_bytes > json_read_buffer->buf.used) { return 1; } - if (current->buf.ptr.text[current->event_collector_un.json_bytes - 2] != '}' || - current->buf.ptr.text[current->event_collector_un.json_bytes - 1] != '\n') + if (json_read_buffer->buf.ptr.text[current->event_collector_un.json_bytes - 2] != '}' || + json_read_buffer->buf.ptr.text[current->event_collector_un.json_bytes - 1] != '\n') { logger_nDPIsrvd(current, "BUG: Collector connection", "invalid JSON string: %.*s", (int)current->event_collector_un.json_bytes, - current->buf.ptr.text); + json_read_buffer->buf.ptr.text); disconnect_client(epollfd, current); return 1; } @@ -1030,7 +1171,9 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur static int handle_incoming_data(int epollfd, struct remote_desc * const current) { - if (current->sock_type != COLLECTOR_UN) + struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current); + + if (json_read_buffer == NULL) { unsigned char garbage = 0; @@ -1047,18 +1190,19 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) } /* read JSON strings (or parts) from the UNIX socket (collecting) */ - if (current->buf.used == current->buf.max) + if (json_read_buffer->buf.used == json_read_buffer->buf.max) { logger_nDPIsrvd(current, "Collector connection", "read buffer (%zu bytes) full. No more read possible.", - current->buf.max); + json_read_buffer->buf.max); } else { errno = 0; - ssize_t bytes_read = - read(current->fd, current->buf.ptr.raw + current->buf.used, current->buf.max - current->buf.used); + ssize_t bytes_read = read(current->fd, + json_read_buffer->buf.ptr.raw + json_read_buffer->buf.used, + json_read_buffer->buf.max - json_read_buffer->buf.used); if (bytes_read < 0 || errno != 0) { logger_nDPIsrvd(current, "Could not read remote", ": %s", strerror(errno)); @@ -1071,10 +1215,10 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) disconnect_client(epollfd, current); return 1; } - current->buf.used += bytes_read; + json_read_buffer->buf.used += bytes_read; } - while (current->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1) + while (json_read_buffer->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1) { if (handle_collector_protocol(epollfd, current) != 0) { @@ -1083,19 +1227,18 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) for (size_t i = 0; i < remotes.desc_size; ++i) { - if (remotes.desc[i].fd < 0) - { - continue; - } - if (remotes.desc[i].sock_type != DISTRIBUTOR_UN && remotes.desc[i].sock_type != DISTRIBUTOR_IN) + struct nDPIsrvd_write_buffer * const write_buffer = get_write_buffer(&remotes.desc[i]); + UT_array * const additional_write_buffers = get_additional_write_buffers(&remotes.desc[i]); + + if (remotes.desc[i].fd < 0 || write_buffer == NULL || additional_write_buffers == NULL) { continue; } - if (current->event_collector_un.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used || - utarray_len(remotes.desc[i].buf_cache) > 0) + if (current->event_collector_un.json_bytes > write_buffer->buf.max - write_buffer->buf.used || + utarray_len(additional_write_buffers) > 0) { - if (utarray_len(remotes.desc[i].buf_cache) == 0) + if (utarray_len(additional_write_buffers) == 0) { #if 0 logger_nDPIsrvd(&remotes.desc[i], @@ -1114,7 +1257,9 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) continue; } } - if (add_to_cache(&remotes.desc[i], current->buf.ptr.raw, current->event_collector_un.json_bytes) != 0) + if (add_to_additional_write_buffers(&remotes.desc[i], + json_read_buffer->buf.ptr.raw, + current->event_collector_un.json_bytes) != 0) { disconnect_client(epollfd, &remotes.desc[i]); continue; @@ -1122,10 +1267,10 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) } else { - memcpy(remotes.desc[i].buf.ptr.raw + remotes.desc[i].buf.used, - current->buf.ptr.raw, + memcpy(write_buffer->buf.ptr.raw + write_buffer->buf.used, + json_read_buffer->buf.ptr.raw, current->event_collector_un.json_bytes); - remotes.desc[i].buf.used += current->event_collector_un.json_bytes; + write_buffer->buf.used += current->event_collector_un.json_bytes; } if (drain_main_buffer(&remotes.desc[i]) != 0) @@ -1134,10 +1279,10 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) } } - memmove(current->buf.ptr.raw, - current->buf.ptr.raw + current->event_collector_un.json_bytes, - current->buf.used - current->event_collector_un.json_bytes); - current->buf.used -= current->event_collector_un.json_bytes; + memmove(json_read_buffer->buf.ptr.raw, + json_read_buffer->buf.ptr.raw + current->event_collector_un.json_bytes, + json_read_buffer->buf.used - current->event_collector_un.json_bytes); + json_read_buffer->buf.used -= current->event_collector_un.json_bytes; current->event_collector_un.json_bytes = 0; } @@ -1230,7 +1375,7 @@ static int mainloop(int epollfd) switch (current->sock_type) { case COLLECTOR_UN: - logger_nDPIsrvd(current, "Collector disconnected", "closed"); + logger_nDPIsrvd(current, "Collector connection", "closed"); break; case DISTRIBUTOR_UN: case DISTRIBUTOR_IN: |