summaryrefslogtreecommitdiff
path: root/nDPId.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2020-07-09 22:40:46 +0200
committerToni Uhlig <matzeton@googlemail.com>2020-07-09 22:40:46 +0200
commitd99bd825b23e4277d88a5149cfbbbc73f2ac206e (patch)
tree47d05fa6e415fcca6c6519338e988e9e75d48da6 /nDPId.c
parent9644a27b3fd2869ec0d29944cee286d7967a5bc1 (diff)
send json string to sink, added basic json event serialization fn call
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r--nDPId.c110
1 files changed, 72 insertions, 38 deletions
diff --git a/nDPId.c b/nDPId.c
index 92e8fa6c6..b8b3d6477 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -594,6 +594,51 @@ static int connect_to_json_socket(struct nDPId_reader_thread * const reader_thre
return 0;
}
+static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
+ char const * const json_str, size_t json_str_len)
+{
+ struct nDPId_workflow * const workflow = reader_thread->workflow;
+ int saved_errno;
+ int s_ret;
+ char newline_json_str[BUFSIZ];
+
+ s_ret = snprintf(newline_json_str, sizeof(newline_json_str), "%.*s\n", (int)json_str_len, json_str);
+ if (s_ret < 0 || s_ret > (int)sizeof(newline_json_str)) {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "[%8llu, %d] JSON buffer prepare failed",
+ workflow->packets_captured,
+ reader_thread->array_index);
+ return;
+ }
+
+ if (reader_thread->json_sock_reconnect != 0) {
+ if (connect_to_json_socket(reader_thread) == 0) {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "[%8llu, %d] Reconnected to JSON sink",
+ workflow->packets_captured,
+ reader_thread->array_index);
+ }
+ }
+
+ if (reader_thread->json_sock_reconnect == 0 &&
+ send(reader_thread->json_sockfd, newline_json_str, s_ret, MSG_NOSIGNAL) < 0)
+ {
+ saved_errno = errno;
+ syslog(LOG_DAEMON | LOG_ERR,
+ "[%8llu, %d] send data to JSON sink failed: %s",
+ workflow->packets_captured,
+ reader_thread->array_index,
+ strerror(saved_errno));
+ if (saved_errno == EPIPE) {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "[%8llu, %d] Lost connection to JSON sink",
+ workflow->packets_captured,
+ reader_thread->array_index);
+ }
+ reader_thread->json_sock_reconnect = 1;
+ }
+}
+
static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
struct nDPId_flow_info const * const flow,
enum flow_event event)
@@ -601,7 +646,6 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
char * json_str;
uint32_t json_str_len = 0;
struct nDPId_workflow * const workflow = reader_thread->workflow;
- int saved_errno;
switch (event) {
case FLOW_NEW:
@@ -625,7 +669,7 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
}
json_str = jsonize_flow(workflow, flow, &json_str_len);
- if (json_str == NULL) {
+ if (json_str == NULL || json_str_len == 0) {
syslog(LOG_DAEMON | LOG_ERR,
"[%8llu, %d, %4u] jsonize failed, buffer length: %u\n",
workflow->packets_captured,
@@ -633,50 +677,31 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
flow->flow_id,
json_str_len);
} else {
- if (reader_thread->json_sock_reconnect != 0) {
- if (connect_to_json_socket(reader_thread) == 0) {
- syslog(LOG_DAEMON | LOG_ERR,
- "[%8llu, %d, %4u] Reconnected to JSON sink",
- workflow->packets_captured,
- reader_thread->array_index,
- flow->flow_id);
- }
- }
-
- if (reader_thread->json_sock_reconnect == 0 &&
- send(reader_thread->json_sockfd, json_str, json_str_len, MSG_NOSIGNAL) < 0)
- {
- saved_errno = errno;
- syslog(LOG_DAEMON | LOG_ERR,
- "[%8llu, %d, %4u] send data to JSON sink failed: %s",
- workflow->packets_captured,
- reader_thread->array_index,
- flow->flow_id, strerror(saved_errno));
- if (saved_errno == EPIPE) {
- syslog(LOG_DAEMON | LOG_ERR,
- "[%8llu, %d, %4u] Lost connection to JSON sink",
- workflow->packets_captured,
- reader_thread->array_index,
- flow->flow_id);
- }
- reader_thread->json_sock_reconnect = 1;
- }
+ send_to_json_sink(reader_thread, json_str, json_str_len);
}
ndpi_reset_serializer(&workflow->ndpi_serializer);
}
static void jsonize_format_error(struct nDPId_reader_thread * const reader_thread, uint32_t format_index)
{
- char * out;
- uint32_t out_size = 0;
+ char * json_str;
+ uint32_t json_str_len = 0;
ndpi_serialize_string_string(&reader_thread->workflow->ndpi_serializer,
"serializer-error", "format");
ndpi_serialize_string_uint32(&reader_thread->workflow->ndpi_serializer,
"serializer-format-index", format_index);
- out = ndpi_serializer_get_buffer(&reader_thread->workflow->ndpi_serializer, &out_size);
- if (out != NULL && out_size > 0) {
- printf("ERR: %s\n", out);
+ json_str = ndpi_serializer_get_buffer(&reader_thread->workflow->ndpi_serializer, &json_str_len);
+ if (json_str != NULL && json_str_len == 0) {
+
+ syslog(LOG_DAEMON | LOG_ERR,
+ "[%8llu, %d] jsonize failed, buffer length: %u\n",
+ reader_thread->workflow->packets_captured,
+ reader_thread->array_index,
+ json_str_len);
+ } else {
+
+ send_to_json_sink(reader_thread, json_str, json_str_len);
}
ndpi_reset_serializer(&reader_thread->workflow->ndpi_serializer);
}
@@ -692,6 +717,10 @@ static void jsonize_basic_event(struct nDPId_reader_thread * const reader_thread
(void)reader_thread;
va_start(ap, format);
while (*format) {
+ if (got_jsonkey == 0) {
+ json_key[0] = '\0';
+ }
+
switch (*format++) {
case 's': {
format_index++;
@@ -725,12 +754,12 @@ static void jsonize_basic_event(struct nDPId_reader_thread * const reader_thread
jsonize_format_error(reader_thread, format_index);
return;
}
- if (*(format++) == 'd') {
+ if (*format == 'd') {
long long int value = va_arg(ap, long long int);
ndpi_serialize_string_int64(&reader_thread->workflow->ndpi_serializer,
json_key, value);
got_jsonkey = 0;
- } else if (*(format++) == 'u') {
+ } else if (*format == 'u') {
unsigned long long int value = va_arg(ap, unsigned long long int);
ndpi_serialize_string_uint64(&reader_thread->workflow->ndpi_serializer,
json_key, value);
@@ -739,6 +768,7 @@ static void jsonize_basic_event(struct nDPId_reader_thread * const reader_thread
jsonize_format_error(reader_thread, format_index);
return;
}
+ format++;
break;
case 'u':
format_index++;
@@ -764,8 +794,9 @@ static void jsonize_basic_event(struct nDPId_reader_thread * const reader_thread
return;
}
break;
+ case ' ':
+ case ',':
case '%':
- format_index++;
break;
default:
jsonize_format_error(reader_thread, format_index);
@@ -834,6 +865,9 @@ static void ndpi_process_packet(uint8_t * const args,
break;
case DLT_EN10MB:
if (header->len < sizeof(struct ndpi_ethhdr)) {
+ jsonize_basic_event(reader_thread, "%s%lu %s%lu %s%d %s%s", "packet_id", workflow->packets_captured,
+ "thread_id", reader_thread->array_index, "msg_id", 0,
+ "msg", "Ethernet packet too short - skipping");
syslog(LOG_DAEMON | LOG_WARNING,
"[%8llu, %d] Ethernet packet too short - skipping\n",
workflow->packets_captured,