diff options
author | lns <matzeton@googlemail.com> | 2022-04-16 23:21:24 +0200 |
---|---|---|
committer | lns <matzeton@googlemail.com> | 2022-04-16 23:21:24 +0200 |
commit | c283b89afda98fdcee81fe1b9634f10af1077878 (patch) | |
tree | 725a3496177903258069a2d11536c348722d79b8 /nDPIsrvd.c | |
parent | db83f82d29df4fd0dfe638fad305366fb265edb8 (diff) |
Refactored buffer subsystem.
Signed-off-by: lns <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r-- | nDPIsrvd.c | 353 |
1 files changed, 249 insertions, 104 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c index 05d3287e5..437ef5616 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -26,12 +26,17 @@ enum sock_type DISTRIBUTOR_IN, }; +struct nDPIsrvd_write_buffer +{ + struct nDPIsrvd_buffer buf; + size_t written; +}; + struct remote_desc { enum sock_type sock_type; int fd; - struct nDPIsrvd_buffer buf; - UT_array * buf_cache; + union { struct @@ -40,6 +45,8 @@ struct remote_desc struct sockaddr_un peer; unsigned long long int json_bytes; pid_t pid; + + struct nDPIsrvd_json_buffer main_read_buffer; } event_collector_un; struct { @@ -47,12 +54,18 @@ struct remote_desc struct sockaddr_un peer; pid_t pid; char * user_name; + + struct nDPIsrvd_write_buffer main_write_buffer; + UT_array * additional_write_buffers; } event_distributor_un; /* UNIX socket */ struct { int distributor_sockfd; struct sockaddr_in peer; char peer_addr[INET_ADDRSTRLEN]; + + struct nDPIsrvd_write_buffer main_write_buffer; + UT_array * additional_write_buffers; } event_distributor_in; /* TCP/IP socket */ }; }; @@ -81,11 +94,11 @@ static struct nDPIsrvd_ull max_remote_descriptors; char * user; char * group; - nDPIsrvd_ull cache_array_length; - int cache_fallback_to_blocking; + nDPIsrvd_ull max_write_buffers; + int bufferbloat_fallback_to_blocking; } nDPIsrvd_options = {.max_remote_descriptors = nDPIsrvd_MAX_REMOTE_DESCRIPTORS, - .cache_array_length = nDPIsrvd_CACHE_ARRAY_LENGTH, - .cache_fallback_to_blocking = 1}; + .max_write_buffers = nDPIsrvd_MAX_WRITE_BUFFERS, + .bufferbloat_fallback_to_blocking = 1}; static void logger_nDPIsrvd(struct remote_desc const * const remote, char const * const prefix, @@ -97,34 +110,32 @@ static int add_in_event_fd(int epollfd, int fd); static int add_in_event(int epollfd, struct remote_desc * const remote); static int del_out_event(int epollfd, struct remote_desc * const remote); static void disconnect_client(int epollfd, struct remote_desc * const current); -static int drain_cache_blocking(struct remote_desc * const remote); +static int drain_write_buffers_blocking(struct remote_desc * const remote); static void nDPIsrvd_buffer_array_copy(void * dst, const void * src) { - struct nDPIsrvd_buffer * const buf_dst = (struct nDPIsrvd_buffer *)dst; - struct nDPIsrvd_buffer const * const buf_src = (struct nDPIsrvd_buffer *)src; + struct nDPIsrvd_write_buffer * const buf_dst = (struct nDPIsrvd_write_buffer *)dst; + struct nDPIsrvd_write_buffer const * const buf_src = (struct nDPIsrvd_write_buffer *)src; - buf_dst->ptr.raw = NULL; - if (nDPIsrvd_buffer_init(buf_dst, buf_src->used) != 0) + buf_dst->buf.ptr.raw = NULL; + if (nDPIsrvd_buffer_init(&buf_dst->buf, buf_src->buf.used) != 0) { return; } - buf_dst->json_string_start = buf_src->json_string_start; - buf_dst->json_string_length = buf_src->json_string_length; - buf_dst->json_string = buf_src->json_string; - buf_dst->used = buf_src->used; - memcpy(buf_dst->ptr.raw, buf_src->ptr.raw, buf_src->used); + buf_dst->written = buf_src->written; + buf_dst->buf.used = buf_src->buf.used; + memcpy(buf_dst->buf.ptr.raw, buf_src->buf.ptr.raw, buf_src->buf.used); } static void nDPIsrvd_buffer_array_dtor(void * elt) { - struct nDPIsrvd_buffer * const buf = (struct nDPIsrvd_buffer *)elt; + struct nDPIsrvd_write_buffer * const buf_dst = (struct nDPIsrvd_write_buffer *)elt; - nDPIsrvd_buffer_free(buf); + nDPIsrvd_buffer_free(&buf_dst->buf); } -static const UT_icd nDPIsrvd_buffer_array_icd = {sizeof(struct nDPIsrvd_buffer), +static const UT_icd nDPIsrvd_buffer_array_icd = {sizeof(struct nDPIsrvd_write_buffer), NULL, nDPIsrvd_buffer_array_copy, nDPIsrvd_buffer_array_dtor}; @@ -142,36 +153,93 @@ void nDPIsrvd_memprof_log(char const * const format, ...) #endif #endif -static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf, nDPIsrvd_ull json_string_length) +static struct nDPIsrvd_json_buffer * get_read_buffer(struct remote_desc * const remote) +{ + switch (remote->sock_type) + { + case COLLECTOR_UN: + return &remote->event_collector_un.main_read_buffer; + + case DISTRIBUTOR_UN: + case DISTRIBUTOR_IN: + return NULL; + } + + return NULL; +} + +static struct nDPIsrvd_write_buffer * get_write_buffer(struct remote_desc * const remote) +{ + switch (remote->sock_type) + { + case COLLECTOR_UN: + return NULL; + + case DISTRIBUTOR_UN: + return &remote->event_distributor_un.main_write_buffer; + + case DISTRIBUTOR_IN: + return &remote->event_distributor_in.main_write_buffer; + } + + return NULL; +} + +static UT_array * get_additional_write_buffers(struct remote_desc * const remote) { - struct nDPIsrvd_buffer buf_src = {}; + switch (remote->sock_type) + { + case COLLECTOR_UN: + return NULL; - if (utarray_len(remote->buf_cache) >= nDPIsrvd_options.cache_array_length) + case DISTRIBUTOR_UN: + return remote->event_distributor_un.additional_write_buffers; + + case DISTRIBUTOR_IN: + return remote->event_distributor_in.additional_write_buffers; + } + + return NULL; +} + +static int add_to_additional_write_buffers(struct remote_desc * const remote, + uint8_t * const buf, + nDPIsrvd_ull json_string_length) +{ + struct nDPIsrvd_write_buffer buf_src = {}; + UT_array * const additional_write_buffers = get_additional_write_buffers(remote); + + if (additional_write_buffers == NULL) { - if (nDPIsrvd_options.cache_fallback_to_blocking == 0) + return -1; + } + + if (utarray_len(additional_write_buffers) >= nDPIsrvd_options.max_write_buffers) + { + if (nDPIsrvd_options.bufferbloat_fallback_to_blocking == 0) { logger_nDPIsrvd(remote, - "Buffer cache limit for", + "Buffer limit for", "for reached, remote too slow: %u lines", - utarray_len(remote->buf_cache)); + utarray_len(additional_write_buffers)); return -1; } else { logger_nDPIsrvd(remote, - "Buffer JSON string cache limit for", + "Buffer limit for", "reached, falling back to blocking I/O: %u lines", - utarray_len(remote->buf_cache)); - if (drain_cache_blocking(remote) != 0) + utarray_len(additional_write_buffers)); + if (drain_write_buffers_blocking(remote) != 0) { return -1; } } } - buf_src.ptr.raw = buf; - buf_src.used = buf_src.max = buf_src.json_string_length = json_string_length; - utarray_push_back(remote->buf_cache, &buf_src); + buf_src.buf.ptr.raw = buf; + buf_src.buf.used = buf_src.buf.max = json_string_length; + utarray_push_back(additional_write_buffers, &buf_src); return 0; } @@ -216,13 +284,20 @@ static void logger_nDPIsrvd(struct remote_desc const * const remote, static int drain_main_buffer(struct remote_desc * const remote) { - if (remote->buf.used == 0) + struct nDPIsrvd_write_buffer * const write_buffer = get_write_buffer(remote); + + if (write_buffer == NULL) + { + return -1; + } + + if (write_buffer->buf.used == 0) { return 0; } errno = 0; - ssize_t bytes_written = write(remote->fd, remote->buf.ptr.raw, remote->buf.used); + ssize_t bytes_written = write(remote->fd, write_buffer->buf.ptr.raw, write_buffer->buf.used); if (errno == EAGAIN) { return 0; @@ -237,32 +312,37 @@ static int drain_main_buffer(struct remote_desc * const remote) logger_nDPIsrvd(remote, "Distributor connection", "closed"); return -1; } - if ((size_t)bytes_written < remote->buf.used) + if ((size_t)bytes_written < write_buffer->buf.used) { #if 0 logger_nDPIsrvd( remote, "Distributor", "wrote less than expected: %zd < %zu", bytes_written, remote->buf.used); #endif - memmove(remote->buf.ptr.raw, remote->buf.ptr.raw + bytes_written, remote->buf.used - bytes_written); + memmove(write_buffer->buf.ptr.raw, + write_buffer->buf.ptr.raw + bytes_written, + write_buffer->buf.used - bytes_written); } - remote->buf.used -= bytes_written; + write_buffer->buf.used -= bytes_written; return 0; } -static int drain_cache(struct remote_desc * const remote) +static int drain_write_buffers(struct remote_desc * const remote) { + UT_array * const additional_write_buffers = get_additional_write_buffers(remote); + errno = 0; - if (drain_main_buffer(remote) != 0) + if (drain_main_buffer(remote) != 0 || additional_write_buffers == NULL) { return -1; } - while (utarray_len(remote->buf_cache) > 0) + while (utarray_len(additional_write_buffers) > 0) { - struct nDPIsrvd_buffer * buf = (struct nDPIsrvd_buffer *)utarray_front(remote->buf_cache); - ssize_t written = write(remote->fd, buf->ptr.raw + buf->json_string_start, buf->json_string_length); + struct nDPIsrvd_write_buffer * buf = (struct nDPIsrvd_write_buffer *)utarray_front(additional_write_buffers); + ssize_t written = write(remote->fd, buf->buf.ptr.raw + buf->written, buf->buf.used - buf->written); + switch (written) { case -1: @@ -274,11 +354,10 @@ static int drain_cache(struct remote_desc * const remote) case 0: return -1; default: - buf->json_string_start += written; - buf->json_string_length -= written; - if (buf->json_string_length == 0) + buf->written += written; + if (buf->written == buf->buf.max) { - utarray_erase(remote->buf_cache, 0, 1); + utarray_erase(additional_write_buffers, 0, 1); } break; } @@ -287,7 +366,7 @@ static int drain_cache(struct remote_desc * const remote) return 0; } -static int drain_cache_blocking(struct remote_desc * const remote) +static int drain_write_buffers_blocking(struct remote_desc * const remote) { int retval = 0; @@ -296,9 +375,9 @@ static int drain_cache_blocking(struct remote_desc * const remote) logger_nDPIsrvd(remote, "Error setting distributor", "fd flags to blocking mode: %s", strerror(errno)); return -1; } - if (drain_cache(remote) != 0) + if (drain_write_buffers(remote) != 0) { - logger_nDPIsrvd(remote, "Could not drain buffer cache for", "in blocking I/O: %s", strerror(errno)); + logger_nDPIsrvd(remote, "Could not drain buffers for", "in blocking I/O: %s", strerror(errno)); retval = -1; } if (fcntl_add_flags(remote->fd, O_NONBLOCK) != 0) @@ -312,17 +391,19 @@ static int drain_cache_blocking(struct remote_desc * const remote) static int handle_outgoing_data(int epollfd, struct remote_desc * const remote) { - if (remote->sock_type != DISTRIBUTOR_UN && remote->sock_type != DISTRIBUTOR_IN) + UT_array * const additional_write_buffers = get_additional_write_buffers(remote); + + if (additional_write_buffers == NULL) { return -1; } - if (drain_cache(remote) != 0) + if (drain_write_buffers(remote) != 0) { - logger_nDPIsrvd(remote, "Could not drain buffer cache for", ": %s", strerror(errno)); + logger_nDPIsrvd(remote, "Could not drain buffers for", ": %s", strerror(errno)); disconnect_client(epollfd, remote); return -1; } - if (utarray_len(remote->buf_cache) == 0) + if (utarray_len(additional_write_buffers) == 0) { return del_out_event(epollfd, remote); } @@ -517,14 +598,42 @@ static struct remote_desc * get_remote_descriptor(enum sock_type type, int remot if (remotes.desc[i].fd == -1) { remotes.desc_used++; - if (remotes.desc[i].buf_cache == NULL) + + struct nDPIsrvd_write_buffer * write_buffer = NULL; + UT_array ** additional_write_buffers = NULL; + + switch (type) { - utarray_new(remotes.desc[i].buf_cache, &nDPIsrvd_buffer_array_icd); + case COLLECTOR_UN: + if (nDPIsrvd_json_buffer_init(&remotes.desc[i].event_collector_un.main_read_buffer, + max_buffer_size) != 0) + { + return NULL; + } + break; + case DISTRIBUTOR_UN: + write_buffer = &remotes.desc[i].event_distributor_un.main_write_buffer; + additional_write_buffers = &remotes.desc[i].event_distributor_un.additional_write_buffers; + break; + case DISTRIBUTOR_IN: + write_buffer = &remotes.desc[i].event_distributor_in.main_write_buffer; + additional_write_buffers = &remotes.desc[i].event_distributor_in.additional_write_buffers; + break; } - if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, max_buffer_size) != 0 || remotes.desc[i].buf_cache == NULL) + + if (additional_write_buffers != NULL && *additional_write_buffers == NULL) + { + utarray_new(*additional_write_buffers, &nDPIsrvd_buffer_array_icd); + if (*additional_write_buffers == NULL) + { + return NULL; + } + } + if (write_buffer != NULL && nDPIsrvd_buffer_init(&write_buffer->buf, max_buffer_size) != 0) { return NULL; } + remotes.desc[i].sock_type = type; remotes.desc[i].fd = remote_fd; return &remotes.desc[i]; @@ -538,12 +647,28 @@ static void free_remotes(void) { for (size_t i = 0; i < remotes.desc_size; ++i) { - if (remotes.desc[i].buf_cache != NULL) + switch (remotes.desc[i].sock_type) { - utarray_free(remotes.desc[i].buf_cache); - remotes.desc[i].buf_cache = NULL; + case COLLECTOR_UN: + nDPIsrvd_json_buffer_free(&remotes.desc[i].event_collector_un.main_read_buffer); + break; + case DISTRIBUTOR_UN: + if (remotes.desc[i].event_distributor_un.additional_write_buffers != NULL) + { + utarray_free(remotes.desc[i].event_distributor_un.additional_write_buffers); + remotes.desc[i].event_distributor_un.additional_write_buffers = NULL; + } + nDPIsrvd_buffer_free(&remotes.desc[i].event_distributor_un.main_write_buffer.buf); + break; + case DISTRIBUTOR_IN: + if (remotes.desc[i].event_distributor_in.additional_write_buffers != NULL) + { + utarray_free(remotes.desc[i].event_distributor_in.additional_write_buffers); + remotes.desc[i].event_distributor_in.additional_write_buffers = NULL; + } + nDPIsrvd_buffer_free(&remotes.desc[i].event_distributor_in.main_write_buffer.buf); + break; } - nDPIsrvd_buffer_free(&remotes.desc[i].buf); } } @@ -610,10 +735,25 @@ static void disconnect_client(int epollfd, struct remote_desc * const current) { case COLLECTOR_UN: logger_nDPIsrvd(current, "Error closing collector connection", ": %s", strerror(errno)); + nDPIsrvd_json_buffer_free(¤t->event_collector_un.main_read_buffer); break; case DISTRIBUTOR_UN: + logger_nDPIsrvd(current, "Error closing distributor connection", ": %s", strerror(errno)); + if (current->event_distributor_un.additional_write_buffers != NULL) + { + utarray_clear(current->event_distributor_un.additional_write_buffers); + } + nDPIsrvd_buffer_free(¤t->event_distributor_un.main_write_buffer.buf); + current->event_distributor_un.main_write_buffer.written = 0; + break; case DISTRIBUTOR_IN: logger_nDPIsrvd(current, "Error closing distributor connection", ": %s", strerror(errno)); + if (current->event_distributor_in.additional_write_buffers != NULL) + { + utarray_clear(current->event_distributor_in.additional_write_buffers); + } + nDPIsrvd_buffer_free(¤t->event_distributor_in.main_write_buffer.buf); + current->event_distributor_in.main_write_buffer.written = 0; break; } } @@ -625,11 +765,6 @@ static void disconnect_client(int epollfd, struct remote_desc * const current) current->fd = -1; remotes.desc_used--; } - if (current->buf_cache != NULL) - { - utarray_clear(current->buf_cache); - } - nDPIsrvd_buffer_free(¤t->buf); } static int nDPIsrvd_parse_options(int argc, char ** argv) @@ -684,14 +819,14 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) nDPIsrvd_options.group = strdup(optarg); break; case 'C': - if (str_value_to_ull(optarg, &nDPIsrvd_options.cache_array_length) != CONVERSION_OK) + if (str_value_to_ull(optarg, &nDPIsrvd_options.max_write_buffers) != CONVERSION_OK) { fprintf(stderr, "%s: Argument for `-C' is not a number: %s\n", argv[0], optarg); return 1; } break; case 'D': - nDPIsrvd_options.cache_fallback_to_blocking = 0; + nDPIsrvd_options.bufferbloat_fallback_to_blocking = 0; break; case 'v': fprintf(stderr, "%s", get_nDPId_version()); @@ -952,21 +1087,27 @@ static int new_connection(int epollfd, int eventfd) static int handle_collector_protocol(int epollfd, struct remote_desc * const current) { + struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current); char * json_str_start = NULL; - if (current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{') + if (json_read_buffer == NULL) + { + return 1; + } + + if (json_read_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{') { logger_nDPIsrvd(current, "BUG: Collector connection", "JSON invalid opening character: '%c'", - current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]); + json_read_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]); disconnect_client(epollfd, current); return 1; } errno = 0; - current->event_collector_un.json_bytes = strtoull((char *)current->buf.ptr.text, &json_str_start, 10); - current->event_collector_un.json_bytes += json_str_start - current->buf.ptr.text; + current->event_collector_un.json_bytes = strtoull(json_read_buffer->buf.ptr.text, &json_str_start, 10); + current->event_collector_un.json_bytes += json_str_start - json_read_buffer->buf.ptr.text; if (errno == ERANGE) { @@ -975,18 +1116,18 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur return 1; } - if (json_str_start == current->buf.ptr.text) + if (json_str_start == json_read_buffer->buf.ptr.text) { logger_nDPIsrvd(current, "BUG: Collector connection", "missing JSON string length in protocol preamble: \"%.*s\"", NETWORK_BUFFER_LENGTH_DIGITS, - current->buf.ptr.text); + json_read_buffer->buf.ptr.text); disconnect_client(epollfd, current); return 1; } - if (json_str_start - current->buf.ptr.text != NETWORK_BUFFER_LENGTH_DIGITS) + if (json_str_start - json_read_buffer->buf.ptr.text != NETWORK_BUFFER_LENGTH_DIGITS) { logger_nDPIsrvd(current, "BUG: Collector connection", @@ -994,33 +1135,33 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur "%ld " "bytes", NETWORK_BUFFER_LENGTH_DIGITS, - (long int)(json_str_start - current->buf.ptr.text)); + (long int)(json_str_start - json_read_buffer->buf.ptr.text)); } - if (current->event_collector_un.json_bytes > current->buf.max) + if (current->event_collector_un.json_bytes > json_read_buffer->buf.max) { logger_nDPIsrvd(current, "BUG: Collector connection", "JSON string too big: %llu > %zu", current->event_collector_un.json_bytes, - current->buf.max); + json_read_buffer->buf.max); disconnect_client(epollfd, current); return 1; } - if (current->event_collector_un.json_bytes > current->buf.used) + if (current->event_collector_un.json_bytes > json_read_buffer->buf.used) { return 1; } - if (current->buf.ptr.text[current->event_collector_un.json_bytes - 2] != '}' || - current->buf.ptr.text[current->event_collector_un.json_bytes - 1] != '\n') + if (json_read_buffer->buf.ptr.text[current->event_collector_un.json_bytes - 2] != '}' || + json_read_buffer->buf.ptr.text[current->event_collector_un.json_bytes - 1] != '\n') { logger_nDPIsrvd(current, "BUG: Collector connection", "invalid JSON string: %.*s", (int)current->event_collector_un.json_bytes, - current->buf.ptr.text); + json_read_buffer->buf.ptr.text); disconnect_client(epollfd, current); return 1; } @@ -1030,7 +1171,9 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur static int handle_incoming_data(int epollfd, struct remote_desc * const current) { - if (current->sock_type != COLLECTOR_UN) + struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current); + + if (json_read_buffer == NULL) { unsigned char garbage = 0; @@ -1047,18 +1190,19 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) } /* read JSON strings (or parts) from the UNIX socket (collecting) */ - if (current->buf.used == current->buf.max) + if (json_read_buffer->buf.used == json_read_buffer->buf.max) { logger_nDPIsrvd(current, "Collector connection", "read buffer (%zu bytes) full. No more read possible.", - current->buf.max); + json_read_buffer->buf.max); } else { errno = 0; - ssize_t bytes_read = - read(current->fd, current->buf.ptr.raw + current->buf.used, current->buf.max - current->buf.used); + ssize_t bytes_read = read(current->fd, + json_read_buffer->buf.ptr.raw + json_read_buffer->buf.used, + json_read_buffer->buf.max - json_read_buffer->buf.used); if (bytes_read < 0 || errno != 0) { logger_nDPIsrvd(current, "Could not read remote", ": %s", strerror(errno)); @@ -1071,10 +1215,10 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) disconnect_client(epollfd, current); return 1; } - current->buf.used += bytes_read; + json_read_buffer->buf.used += bytes_read; } - while (current->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1) + while (json_read_buffer->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1) { if (handle_collector_protocol(epollfd, current) != 0) { @@ -1083,19 +1227,18 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) for (size_t i = 0; i < remotes.desc_size; ++i) { - if (remotes.desc[i].fd < 0) - { - continue; - } - if (remotes.desc[i].sock_type != DISTRIBUTOR_UN && remotes.desc[i].sock_type != DISTRIBUTOR_IN) + struct nDPIsrvd_write_buffer * const write_buffer = get_write_buffer(&remotes.desc[i]); + UT_array * const additional_write_buffers = get_additional_write_buffers(&remotes.desc[i]); + + if (remotes.desc[i].fd < 0 || write_buffer == NULL || additional_write_buffers == NULL) { continue; } - if (current->event_collector_un.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used || - utarray_len(remotes.desc[i].buf_cache) > 0) + if (current->event_collector_un.json_bytes > write_buffer->buf.max - write_buffer->buf.used || + utarray_len(additional_write_buffers) > 0) { - if (utarray_len(remotes.desc[i].buf_cache) == 0) + if (utarray_len(additional_write_buffers) == 0) { #if 0 logger_nDPIsrvd(&remotes.desc[i], @@ -1114,7 +1257,9 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) continue; } } - if (add_to_cache(&remotes.desc[i], current->buf.ptr.raw, current->event_collector_un.json_bytes) != 0) + if (add_to_additional_write_buffers(&remotes.desc[i], + json_read_buffer->buf.ptr.raw, + current->event_collector_un.json_bytes) != 0) { disconnect_client(epollfd, &remotes.desc[i]); continue; @@ -1122,10 +1267,10 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) } else { - memcpy(remotes.desc[i].buf.ptr.raw + remotes.desc[i].buf.used, - current->buf.ptr.raw, + memcpy(write_buffer->buf.ptr.raw + write_buffer->buf.used, + json_read_buffer->buf.ptr.raw, current->event_collector_un.json_bytes); - remotes.desc[i].buf.used += current->event_collector_un.json_bytes; + write_buffer->buf.used += current->event_collector_un.json_bytes; } if (drain_main_buffer(&remotes.desc[i]) != 0) @@ -1134,10 +1279,10 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) } } - memmove(current->buf.ptr.raw, - current->buf.ptr.raw + current->event_collector_un.json_bytes, - current->buf.used - current->event_collector_un.json_bytes); - current->buf.used -= current->event_collector_un.json_bytes; + memmove(json_read_buffer->buf.ptr.raw, + json_read_buffer->buf.ptr.raw + current->event_collector_un.json_bytes, + json_read_buffer->buf.used - current->event_collector_un.json_bytes); + json_read_buffer->buf.used -= current->event_collector_un.json_bytes; current->event_collector_un.json_bytes = 0; } @@ -1230,7 +1375,7 @@ static int mainloop(int epollfd) switch (current->sock_type) { case COLLECTOR_UN: - logger_nDPIsrvd(current, "Collector disconnected", "closed"); + logger_nDPIsrvd(current, "Collector connection", "closed"); break; case DISTRIBUTOR_UN: case DISTRIBUTOR_IN: |