diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2021-06-07 14:42:40 +0200 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2021-06-07 15:04:46 +0200 |
commit | 54e0601fec5330aa93810c7212dde233c4354379 (patch) | |
tree | ed44b4b3d72d407a73c295a3df9952a255905891 | |
parent | 382706cd20b99f12b0e7106ffc4af0e01a5e0e02 (diff) |
Unified IO buffer mgmt.
* c-collectd gives the user control over collectd-exec instance name
* added missing collectd type `flow_l4_icmp_count`
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
-rw-r--r-- | TODO.md | 9 | ||||
-rw-r--r-- | dependencies/nDPIsrvd.h | 59 | ||||
-rw-r--r-- | examples/c-collectd/c-collectd.c | 23 | ||||
-rw-r--r-- | examples/c-collectd/plugin_nDPIsrvd_types.db | 1 | ||||
-rw-r--r-- | nDPId-test.c | 27 | ||||
-rw-r--r-- | nDPId.c | 50 | ||||
-rw-r--r-- | nDPIsrvd.c | 61 |
7 files changed, 137 insertions, 93 deletions
@@ -1,7 +1,6 @@ # TODOs -1. unify `struct io_buffer` from nDPIsrvd.c and `struct nDPIsrvd_buffer` from nDPIsrvd.h -2. improve nDPIsrvd buffer bloat handling (Do not fall back to blocking mode!) -3. improve UDP/TCP timeout handling by reading netfilter conntrack timeouts from /proc (or just read the conntrack table directly) -4. detect interface / timeout changes and apply them to nDPId -5. implement AEAD crypto via libsodium (at least for TCP communication) +1. improve nDPIsrvd buffer bloat handling (Do not fall back to blocking mode!) +2. improve UDP/TCP timeout handling by reading netfilter conntrack timeouts from /proc (or just read conntrack table entries) +3. detect interface / timeout changes and apply them to nDPId +4. implement AEAD crypto via libsodium (at least for TCP communication) diff --git a/dependencies/nDPIsrvd.h b/dependencies/nDPIsrvd.h index 35f6133a3..764dd204a 100644 --- a/dependencies/nDPIsrvd.h +++ b/dependencies/nDPIsrvd.h @@ -134,8 +134,12 @@ struct nDPIsrvd_address struct nDPIsrvd_buffer { - char raw[NETWORK_BUFFER_MAX_SIZE]; + union { + char * text; + uint8_t * raw; + } ptr; size_t used; + size_t max; char * json_string; size_t json_string_start; nDPIsrvd_ull json_string_length; @@ -288,6 +292,33 @@ static inline char const * nDPIsrvd_enum_to_string(int enum_value) return enum_str[enum_value - FIRST_ENUM_VALUE]; } +static inline int nDPIsrvd_buffer_init(struct nDPIsrvd_buffer * const buffer, size_t buffer_size) +{ + if (buffer->ptr.raw != NULL && buffer->max != buffer_size) + { + return 1; /* Do not fail and realloc()? */ + } + + buffer->ptr.raw = (uint8_t *)malloc(buffer_size); + if (buffer->ptr.raw == NULL) + { + return 1; + } + + buffer->json_string_start = 0; + buffer->json_string_length = 0ull; + buffer->json_string = NULL; + buffer->used = 0; + buffer->max = buffer_size; + + return 0; +} + +static inline void nDPIsrvd_buffer_free(struct nDPIsrvd_buffer * const buffer) +{ + free(buffer->ptr.raw); +} + static inline struct nDPIsrvd_socket * nDPIsrvd_init(size_t global_user_data_size, size_t flow_user_data_size, json_callback json_cb, @@ -306,6 +337,10 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_init(size_t global_user_data_siz memset(sock, 0, sizeof(*sock)); sock->fd = -1; + if (nDPIsrvd_buffer_init(&sock->buffer, NETWORK_BUFFER_MAX_SIZE) != 0) + { + goto error; + } sock->address.raw.sa_family = -1; sock->flow_user_data_size = flow_user_data_size; @@ -324,6 +359,7 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_init(size_t global_user_data_siz return sock; error: + nDPIsrvd_buffer_free(&sock->buffer); nDPIsrvd_free(&sock); return NULL; } @@ -368,6 +404,7 @@ static inline void nDPIsrvd_free(struct nDPIsrvd_socket ** const sock) (*sock)->flow_table = NULL; } + nDPIsrvd_buffer_free(&(*sock)->buffer); free(*sock); *sock = NULL; @@ -464,7 +501,7 @@ static inline enum nDPIsrvd_connect_return nDPIsrvd_connect(struct nDPIsrvd_sock static inline enum nDPIsrvd_read_return nDPIsrvd_read(struct nDPIsrvd_socket * const sock) { ssize_t bytes_read = - read(sock->fd, sock->buffer.raw + sock->buffer.used, sizeof(sock->buffer.raw) - sock->buffer.used); + read(sock->fd, sock->buffer.ptr.raw + sock->buffer.used, sock->buffer.max - sock->buffer.used); if (bytes_read == 0) { @@ -664,25 +701,25 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_line(struct nDPIsrvd_buf { return PARSE_NEED_MORE_DATA; } - if (buffer->raw[NETWORK_BUFFER_LENGTH_DIGITS] != '{') + if (buffer->ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{') { return PARSE_INVALID_OPENING_CHAR; } errno = 0; - buffer->json_string_length = strtoull((const char *)buffer->raw, &buffer->json_string, 10); - buffer->json_string_length += buffer->json_string - buffer->raw; - buffer->json_string_start = buffer->json_string - buffer->raw; + buffer->json_string_length = strtoull((const char *)buffer->ptr.text, &buffer->json_string, 10); + buffer->json_string_length += buffer->json_string - buffer->ptr.text; + buffer->json_string_start = buffer->json_string - buffer->ptr.text; if (errno == ERANGE) { return PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT; } - if (buffer->json_string == buffer->raw) + if (buffer->json_string == buffer->ptr.text) { return PARSE_SIZE_MISSING; } - if (buffer->json_string_length > sizeof(buffer->raw)) + if (buffer->json_string_length > buffer->max) { return PARSE_STRING_TOO_BIG; } @@ -690,14 +727,14 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_line(struct nDPIsrvd_buf { return PARSE_NEED_MORE_DATA; } - if (buffer->raw[buffer->json_string_length - 2] != '}' || buffer->raw[buffer->json_string_length - 1] != '\n') + if (buffer->ptr.text[buffer->json_string_length - 2] != '}' || buffer->ptr.text[buffer->json_string_length - 1] != '\n') { return PARSE_INVALID_CLOSING_CHAR; } jsmn_init(&jsmn->parser); jsmn->tokens_found = jsmn_parse(&jsmn->parser, - (char *)(buffer->raw + buffer->json_string_start), + buffer->ptr.text + buffer->json_string_start, buffer->json_string_length - buffer->json_string_start, jsmn->tokens, nDPIsrvd_MAX_JSON_TOKENS); @@ -711,7 +748,7 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_line(struct nDPIsrvd_buf static void nDPIsrvd_drain_buffer(struct nDPIsrvd_buffer * const buffer) { - memmove(buffer->raw, buffer->raw + buffer->json_string_length, buffer->used - buffer->json_string_length); + memmove(buffer->ptr.raw, buffer->ptr.raw + buffer->json_string_length, buffer->used - buffer->json_string_length); buffer->used -= buffer->json_string_length; buffer->json_string_length = 0; buffer->json_string_start = 0; diff --git a/examples/c-collectd/c-collectd.c b/examples/c-collectd/c-collectd.c index 42e8b6020..59f724773 100644 --- a/examples/c-collectd/c-collectd.c +++ b/examples/c-collectd/c-collectd.c @@ -10,6 +10,8 @@ #include "nDPIsrvd.h" +#define DEFAULT_COLLECTD_EXEC_INST "exec-nDPIsrvd" + #define LOG(flags, format, ...) \ if (quiet == 0) \ { \ @@ -28,6 +30,7 @@ static int collectd_timerfd = -1; static char * serv_optarg = NULL; static char * collectd_hostname = NULL; static char * collectd_interval = NULL; +static char * instance_name = NULL; static nDPIsrvd_ull collectd_interval_ull = 0uL; static int quiet = 0; @@ -134,16 +137,19 @@ static int parse_options(int argc, char ** argv) static char const usage[] = "Usage: %s " - "[-s host] [-c hostname] [-i interval] [-q]\n\n" + "[-s host] [-c hostname] [-n collectd-instance-name] [-i interval] [-q]\n\n" "\t-s\tDestination where nDPIsrvd is listening on.\n" "\t-c\tCollectd hostname.\n" "\t \tThis value defaults to the environment variable COLLECTD_HOSTNAME.\n" + "\t-n\tName of the collectd(-exec) instance.\n" + "\t \tDefaults to: " DEFAULT_COLLECTD_EXEC_INST + "\n" "\t-i\tInterval between print statistics to stdout.\n" "\t \tThis value defaults to the environment variable COLLECTD_INTERVAL.\n" "\t-q\tDo not print anything except collectd statistics.\n" "\t \tAutomatically enabled if environment variables mentioned above are set.\n"; - while ((opt = getopt(argc, argv, "hs:c:i:q")) != -1) + while ((opt = getopt(argc, argv, "hs:c:n:i:q")) != -1) { switch (opt) { @@ -155,6 +161,10 @@ static int parse_options(int argc, char ** argv) free(collectd_hostname); collectd_hostname = strdup(optarg); break; + case 'n': + free(instance_name); + instance_name = strdup(optarg); + break; case 'i': free(collectd_interval); collectd_interval = strdup(optarg); @@ -182,6 +192,11 @@ static int parse_options(int argc, char ** argv) } } + if (instance_name == NULL) + { + instance_name = strdup(DEFAULT_COLLECTD_EXEC_INST); + } + if (collectd_interval == NULL) { collectd_interval = getenv("COLLECTD_INTERVAL"); @@ -217,9 +232,9 @@ static int parse_options(int argc, char ** argv) return 0; } -#define COLLECTD_PUTVAL_N_FORMAT(name) "PUTVAL %s/nDPId/" #name " interval=%llu %llu:%llu\n" +#define COLLECTD_PUTVAL_N_FORMAT(name) "PUTVAL %s/%s/" #name " interval=%llu %llu:%llu\n" #define COLLECTD_PUTVAL_N(value) \ - collectd_hostname, collectd_interval_ull, (unsigned long long int)now, \ + collectd_hostname, instance_name, collectd_interval_ull, (unsigned long long int)now, \ (unsigned long long int)collectd_statistics.value static void print_collectd_exec_output(void) { diff --git a/examples/c-collectd/plugin_nDPIsrvd_types.db b/examples/c-collectd/plugin_nDPIsrvd_types.db index 7211939c3..dc910a3bb 100644 --- a/examples/c-collectd/plugin_nDPIsrvd_types.db +++ b/examples/c-collectd/plugin_nDPIsrvd_types.db @@ -60,6 +60,7 @@ flow_category_unknown_count value:GAUGE:0:U flow_l3_ip4_count value:GAUGE:0:U flow_l3_ip6_count value:GAUGE:0:U flow_l3_other_count value:GAUGE:0:U +flow_l4_icmp_count value:GAUGE:0:U flow_l4_tcp_count value:GAUGE:0:U flow_l4_udp_count value:GAUGE:0:U flow_l4_other_count value:GAUGE:0:U diff --git a/nDPId-test.c b/nDPId-test.c index 37d3e2343..d8ef7847f 100644 --- a/nDPId-test.c +++ b/nDPId-test.c @@ -135,47 +135,38 @@ error: return NULL; } -static enum nDPIsrvd_parse_return parse_json_lines(struct io_buffer * const buffer) +static enum nDPIsrvd_parse_return parse_json_lines(struct nDPIsrvd_buffer * const buffer) { - struct nDPIsrvd_buffer buf = {}; struct nDPIsrvd_jsmn jsmn = {}; - size_t const n = (buffer->used > sizeof(buf.raw) ? sizeof(buf.raw) : buffer->used); + size_t const n = (buffer->used > buffer->max ? buffer->max : buffer->used); if (n > NETWORK_BUFFER_MAX_SIZE) { return PARSE_STRING_TOO_BIG; } - memcpy(buf.raw, buffer->ptr, n); - buf.used = buffer->used; - enum nDPIsrvd_parse_return ret; - while ((ret = nDPIsrvd_parse_line(&buf, &jsmn)) == PARSE_OK) + while ((ret = nDPIsrvd_parse_line(buffer, &jsmn)) == PARSE_OK) { if (jsmn.tokens_found == 0) { return PARSE_JSMN_ERROR; } - nDPIsrvd_drain_buffer(&buf); + nDPIsrvd_drain_buffer(buffer); } - memcpy(buffer->ptr, buf.raw, buf.used); - buffer->used = buf.used; - return ret; } static void * distributor_client_mainloop_thread(void * const arg) { - struct io_buffer client_buffer = {.ptr = (uint8_t *)malloc(NETWORK_BUFFER_MAX_SIZE), - .max = NETWORK_BUFFER_MAX_SIZE, - .used = 0}; + struct nDPIsrvd_buffer client_buffer = {}; int dis_epollfd = create_evq(); int signalfd = setup_signalfd(dis_epollfd); struct epoll_event events[32]; size_t const events_size = sizeof(events) / sizeof(events[0]); - if (client_buffer.ptr == NULL || dis_epollfd < 0 || signalfd < 0) + if (nDPIsrvd_buffer_init(&client_buffer, NETWORK_BUFFER_MAX_SIZE) != 0 || dis_epollfd < 0 || signalfd < 0) { THREAD_ERROR_GOTO(arg); } @@ -198,7 +189,7 @@ static void * distributor_client_mainloop_thread(void * const arg) if (events[i].data.fd == mock_servfds[PIPE_READ]) { ssize_t bytes_read = read(mock_servfds[PIPE_READ], - client_buffer.ptr + client_buffer.used, + client_buffer.ptr.raw + client_buffer.used, client_buffer.max - client_buffer.used); if (bytes_read == 0) { @@ -208,7 +199,7 @@ static void * distributor_client_mainloop_thread(void * const arg) { THREAD_ERROR_GOTO(arg); } - printf("%.*s", (int)bytes_read, client_buffer.ptr + client_buffer.used); + printf("%.*s", (int)bytes_read, client_buffer.ptr.text + client_buffer.used); client_buffer.used += bytes_read; enum nDPIsrvd_parse_return parse_ret = parse_json_lines(&client_buffer); @@ -247,7 +238,7 @@ error: del_event(dis_epollfd, mock_servfds[PIPE_READ]); close(dis_epollfd); close(signalfd); - free(client_buffer.ptr); + nDPIsrvd_buffer_free(&client_buffer); return NULL; } @@ -1309,7 +1309,9 @@ static void jsonize_daemon(struct nDPId_reader_thread * const reader_thread, enu "reader-thread-count", nDPId_options.reader_thread_count); ndpi_serialize_string_int64(&workflow->ndpi_serializer, "idle-scan-period", nDPId_options.idle_scan_period); - ndpi_serialize_string_int64(&workflow->ndpi_serializer, "generic-max-idle-time", nDPId_options.generic_max_idle_time); + ndpi_serialize_string_int64(&workflow->ndpi_serializer, + "generic-max-idle-time", + nDPId_options.generic_max_idle_time); ndpi_serialize_string_int64(&workflow->ndpi_serializer, "icmp-max-idle-time", nDPId_options.icmp_max_idle_time); ndpi_serialize_string_int64(&workflow->ndpi_serializer, "udp-max-idle-time", nDPId_options.udp_max_idle_time); ndpi_serialize_string_int64(&workflow->ndpi_serializer, "tcp-max-idle-time", nDPId_options.tcp_max_idle_time); @@ -1337,7 +1339,9 @@ static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_fl ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_tot_l4_payload_len", flow_ext->total_l4_payload_len); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_avg_l4_payload_len", - (flow_ext->packets_processed > 0 ? flow_ext->total_l4_payload_len / flow_ext->packets_processed : 0)); + (flow_ext->packets_processed > 0 + ? flow_ext->total_l4_payload_len / flow_ext->packets_processed + : 0)); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "midstream", flow_ext->flow_basic.tcp_is_midstream_flow); } @@ -1962,14 +1966,14 @@ static uint32_t calculate_ndpi_flow_struct_hash(struct ndpi_flow_struct const * return hash; } -#define SNAP 0xaa +#define SNAP 0xaa /* mask for FCF */ -#define WIFI_DATA 0x2 /* 0000 0010 */ -#define FCF_TYPE(fc) (((fc) >> 2) & 0x3) /* 0000 0011 = 0x3 */ -#define FCF_TO_DS(fc) ((fc) & 0x0100) -#define FCF_FROM_DS(fc) ((fc) & 0x0200) +#define WIFI_DATA 0x2 /* 0000 0010 */ +#define FCF_TYPE(fc) (((fc) >> 2) & 0x3) /* 0000 0011 = 0x3 */ +#define FCF_TO_DS(fc) ((fc)&0x0100) +#define FCF_FROM_DS(fc) ((fc)&0x0200) /* mask for Bad FCF presence */ -#define BAD_FCS 0x50 /* 0101 0000 */ +#define BAD_FCS 0x50 /* 0101 0000 */ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thread, struct pcap_pkthdr const * const header, uint8_t const * const packet, @@ -2018,7 +2022,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre return 1; } - struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset]; + struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const) & packet[eth_offset]; *ip_offset = sizeof(struct ndpi_chdlc); *layer3_type = ntohs(chdlc->proto_code); break; @@ -2034,12 +2038,14 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre if (packet[0] == 0x0f || packet[0] == 0x8f) { - struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset]; + struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const) & packet[eth_offset]; *ip_offset = sizeof(struct ndpi_chdlc); /* CHDLC_OFF = 4 */ *layer3_type = ntohs(chdlc->proto_code); - } else { + } + else + { *ip_offset = 2; - *layer3_type = ntohs(*((u_int16_t*)&packet[eth_offset])); + *layer3_type = ntohs(*((u_int16_t *)&packet[eth_offset])); } break; case DLT_LINUX_SLL: @@ -2050,7 +2056,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre return 1; } - *layer3_type = (packet[eth_offset+14] << 8) + packet[eth_offset+15]; + *layer3_type = (packet[eth_offset + 14] << 8) + packet[eth_offset + 15]; *ip_offset = 16 + eth_offset; break; case DLT_IEEE802_11_RADIO: @@ -2062,7 +2068,8 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre return 1; } - struct ndpi_radiotap_header const * const radiotap = (struct ndpi_radiotap_header const * const)&packet[eth_offset]; + struct ndpi_radiotap_header const * const radiotap = + (struct ndpi_radiotap_header const * const) & packet[eth_offset]; uint16_t radio_len = radiotap->len; /* Check Bad FCS presence */ @@ -2081,19 +2088,21 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre } /* Calculate 802.11 header length (variable) */ - struct ndpi_wifi_header const * const wifi = (struct ndpi_wifi_header const * const)(packet + eth_offset + radio_len); + struct ndpi_wifi_header const * const wifi = + (struct ndpi_wifi_header const * const)(packet + eth_offset + radio_len); uint16_t fc = wifi->fc; int wifi_len = 0; /* check wifi data presence */ if (FCF_TYPE(fc) == WIFI_DATA) { - if ((FCF_TO_DS(fc) && FCF_FROM_DS(fc) == 0x0) || - (FCF_TO_DS(fc) == 0x0 && FCF_FROM_DS(fc))) + if ((FCF_TO_DS(fc) && FCF_FROM_DS(fc) == 0x0) || (FCF_TO_DS(fc) == 0x0 && FCF_FROM_DS(fc))) { wifi_len = 26; /* + 4 byte fcs */ } - } else { + } + else + { /* no data frames */ break; } @@ -2104,7 +2113,8 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre return 1; } - struct ndpi_llc_header_snap const * const llc = (struct ndpi_llc_header_snap const * const)(packet + eth_offset + wifi_len + radio_len); + struct ndpi_llc_header_snap const * const llc = + (struct ndpi_llc_header_snap const * const)(packet + eth_offset + wifi_len + radio_len); if (llc->dsap == SNAP) { *layer3_type = ntohs(llc->snap.proto_ID); @@ -2397,7 +2407,7 @@ static void ndpi_process_packet(uint8_t * const args, return; } tcp = (struct ndpi_tcphdr *)l4_ptr; - l4_payload_len = ndpi_max(0, l4_len-4*tcp->doff); + l4_payload_len = ndpi_max(0, l4_len - 4 * tcp->doff); flow_basic.tcp_fin_rst_seen = (tcp->fin == 1 || tcp->rst == 1 ? 1 : 0); flow_basic.tcp_is_midstream_flow = (tcp->syn == 0 ? 1 : 0); flow_basic.src_port = ntohs(tcp->source); diff --git a/nDPIsrvd.c b/nDPIsrvd.c index 7e3a62167..7e5f80a9e 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -18,13 +18,6 @@ #include "nDPIsrvd.h" #include "utils.h" -struct io_buffer -{ - uint8_t * ptr; - size_t used; - size_t max; -}; - enum sock_type { JSON_SOCK, @@ -35,7 +28,7 @@ struct remote_desc { enum sock_type sock_type; int fd; - struct io_buffer buf; + struct nDPIsrvd_buffer buf; union { struct { @@ -52,7 +45,7 @@ struct remote_desc }; }; -static struct remotes +static struct { struct remote_desc * desc; size_t desc_size; @@ -181,9 +174,10 @@ static struct remote_desc * get_unused_remote_descriptor(enum sock_type type, in if (remotes.desc[i].fd == -1) { remotes.desc_used++; - remotes.desc[i].buf.ptr = (uint8_t *)malloc(NETWORK_BUFFER_MAX_SIZE); - remotes.desc[i].buf.max = NETWORK_BUFFER_MAX_SIZE; - remotes.desc[i].buf.used = 0; + if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, NETWORK_BUFFER_MAX_SIZE) != 0) + { + return NULL; + } remotes.desc[i].sock_type = type; remotes.desc[i].fd = remote_fd; return &remotes.desc[i]; @@ -226,8 +220,7 @@ static void disconnect_client(int epollfd, struct remote_desc * const current) current->fd = -1; remotes.desc_used--; } - free(current->buf.ptr); - current->buf.ptr = NULL; + nDPIsrvd_buffer_free(¤t->buf); } static int nDPIsrvd_parse_options(int argc, char ** argv) @@ -437,18 +430,18 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur { char * json_str_start = NULL; - if (current->buf.ptr[NETWORK_BUFFER_LENGTH_DIGITS] != '{') + if (current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{') { syslog(LOG_DAEMON | LOG_ERR, "BUG: JSON invalid opening character: '%c'", - current->buf.ptr[NETWORK_BUFFER_LENGTH_DIGITS]); + current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]); disconnect_client(epollfd, current); return 1; } errno = 0; - current->event_json.json_bytes = strtoull((char *)current->buf.ptr, &json_str_start, 10); - current->event_json.json_bytes += (uint8_t *)json_str_start - current->buf.ptr; + current->event_json.json_bytes = strtoull((char *)current->buf.ptr.text, &json_str_start, 10); + current->event_json.json_bytes += json_str_start - current->buf.ptr.text; if (errno == ERANGE) { @@ -457,12 +450,12 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur return 1; } - if ((uint8_t *)json_str_start == current->buf.ptr) + if (json_str_start == current->buf.ptr.text) { syslog(LOG_DAEMON | LOG_ERR, "BUG: Missing size before JSON string: \"%.*s\"", NETWORK_BUFFER_LENGTH_DIGITS, - current->buf.ptr); + current->buf.ptr.text); disconnect_client(epollfd, current); return 1; } @@ -482,13 +475,13 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur return 1; } - if (current->buf.ptr[current->event_json.json_bytes - 2] != '}' || - current->buf.ptr[current->event_json.json_bytes - 1] != '\n') + if (current->buf.ptr.text[current->event_json.json_bytes - 2] != '}' || + current->buf.ptr.text[current->event_json.json_bytes - 1] != '\n') { syslog(LOG_DAEMON | LOG_ERR, "BUG: Invalid JSON string: %.*s", (int)current->event_json.json_bytes, - current->buf.ptr); + current->buf.ptr.text); disconnect_client(epollfd, current); return 1; } @@ -512,7 +505,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) { errno = 0; ssize_t bytes_read = - read(current->fd, current->buf.ptr + current->buf.used, current->buf.max - current->buf.used); + read(current->fd, current->buf.ptr.raw + current->buf.used, current->buf.max - current->buf.used); if (bytes_read < 0 || errno != 0) { disconnect_client(epollfd, current); @@ -564,7 +557,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) disconnect_client(epollfd, &remotes.desc[i]); continue; } - if (write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used) != + if (write(remotes.desc[i].fd, remotes.desc[i].buf.ptr.raw, remotes.desc[i].buf.used) != (ssize_t)remotes.desc[i].buf.used) { syslog(LOG_DAEMON | LOG_ERR, @@ -582,13 +575,13 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) } } - memcpy(remotes.desc[i].buf.ptr + remotes.desc[i].buf.used, - current->buf.ptr, + memcpy(remotes.desc[i].buf.ptr.raw + remotes.desc[i].buf.used, + current->buf.ptr.raw, current->event_json.json_bytes); remotes.desc[i].buf.used += current->event_json.json_bytes; errno = 0; - ssize_t bytes_written = write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used); + ssize_t bytes_written = write(remotes.desc[i].fd, remotes.desc[i].buf.ptr.raw, remotes.desc[i].buf.used); if (errno == EAGAIN) { continue; @@ -630,8 +623,8 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) ntohs(remotes.desc[i].event_serv.peer.sin_port), bytes_written, remotes.desc[i].buf.used); - memmove(remotes.desc[i].buf.ptr, - remotes.desc[i].buf.ptr + bytes_written, + memmove(remotes.desc[i].buf.ptr.raw, + remotes.desc[i].buf.ptr.raw + bytes_written, remotes.desc[i].buf.used - bytes_written); remotes.desc[i].buf.used -= bytes_written; continue; @@ -640,8 +633,8 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) remotes.desc[i].buf.used = 0; } - memmove(current->buf.ptr, - current->buf.ptr + current->event_json.json_bytes, + memmove(current->buf.ptr.raw, + current->buf.ptr.raw + current->event_json.json_bytes, current->buf.used - current->event_json.json_bytes); current->buf.used -= current->event_json.json_bytes; current->event_json.json_bytes = 0; @@ -819,7 +812,7 @@ static int setup_remote_descriptors(size_t max_descriptors) { remotes.desc_used = 0; remotes.desc_size = max_descriptors; - remotes.desc = (struct remote_desc *)malloc(remotes.desc_size * sizeof(*remotes.desc)); + remotes.desc = (struct remote_desc *)calloc(remotes.desc_size, sizeof(*remotes.desc)); if (remotes.desc == NULL) { return -1; @@ -827,8 +820,6 @@ static int setup_remote_descriptors(size_t max_descriptors) for (size_t i = 0; i < remotes.desc_size; ++i) { remotes.desc[i].fd = -1; - remotes.desc[i].buf.ptr = NULL; - remotes.desc[i].buf.max = 0; } return 0; |