aboutsummaryrefslogtreecommitdiff
path: root/example/serializer/json2tlv.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'example/serializer/json2tlv.cpp')
-rw-r--r--example/serializer/json2tlv.cpp99
1 files changed, 83 insertions, 16 deletions
diff --git a/example/serializer/json2tlv.cpp b/example/serializer/json2tlv.cpp
index 2d7a4b0ef..8d43e4ec0 100644
--- a/example/serializer/json2tlv.cpp
+++ b/example/serializer/json2tlv.cpp
@@ -6,9 +6,16 @@
#include "json.h"
#include "ndpi_main.h"
+#include <zmq.h>
using namespace std;
+struct zmq_msg_hdr {
+ char url[32];
+ u_int32_t version;
+ u_int32_t size;
+};
+
static pair<char *, size_t> get_corpus(string filename) {
ifstream is(filename, ios::binary);
@@ -25,7 +32,7 @@ static pair<char *, size_t> get_corpus(string filename) {
return make_pair((char *)aligned_buffer, length);
}
- cerr << "JSON file not found or empty\n";
+ cerr << "JSON file " << filename << "not found or empty\n";
exit(1);
}
@@ -93,23 +100,25 @@ void json_to_tlv(json_object * jobj, ndpi_serializer *serializer) {
}
void print_help(char *bin) {
- cerr << "Usage: " << bin << " -v -i <JSON file>\n";
+ cerr << "Usage: " << bin << " -i <JSON file> [-z <ZMQ endpoint>] [-E <num encoding loops] [-D <num decoding loop>] [-v]\n";
cerr << "Note: the JSON file should contain an array of records\n";
}
int main(int argc, char *argv[]) {
char *json_path = NULL;
- int repeat = 1;
+ char* zmq_endpoint = NULL;
+ void *zmq_sock = NULL;
+ void *zmq_context = NULL;
+ int enc_repeat = 1, dec_repeat = 1;
int verbose = 0;
struct timeval t1, t2;
uint64_t total_time_usec;
ndpi_serializer *serializer;
ndpi_serializer deserializer;
- int i, num_elements;
- int rc;
+ int rc, i, num_elements, exported_flows = 0;
char c;
- while ((c = getopt(argc, argv,"hi:v")) != '?') {
+ while ((c = getopt(argc, argv,"hi:vz:E:D:")) != '?') {
if (c == (char) 255 || c == -1) break;
switch(c) {
@@ -125,6 +134,18 @@ int main(int argc, char *argv[]) {
case 'v':
verbose = 1;
break;
+
+ case 'z':
+ zmq_endpoint = strdup(optarg);
+ break;
+
+ case 'E':
+ enc_repeat = atoi(optarg);
+ break;
+
+ case 'D':
+ dec_repeat = atoi(optarg);
+ break;
}
}
@@ -133,9 +154,35 @@ int main(int argc, char *argv[]) {
exit(1);
}
+ if (zmq_endpoint) {
+ zmq_context = zmq_ctx_new();
+ if (zmq_context == NULL) {
+ printf("Unable to initialize ZMQ zmq_context");
+ exit(1);
+ }
+
+ zmq_sock = zmq_socket(zmq_context, ZMQ_PUB);
+ if (zmq_sock == NULL) {
+ printf("Unable to create ZMQ socket");
+ exit(1);
+ }
+
+ if (zmq_endpoint[strlen(zmq_endpoint) - 1] == 'c') {
+ /* Collector mode */
+ if (zmq_connect(zmq_sock, zmq_endpoint) != 0)
+ printf("Unable to connect to ZMQ socket %s: %s\n", zmq_endpoint, strerror(errno));
+ } else {
+ /* Probe mode */
+ if (zmq_bind(zmq_sock, zmq_endpoint) != 0) {
+ printf("Unable to bind to ZMQ socket %s: %s\n", zmq_endpoint, strerror(errno));
+ exit(1);
+ }
+ }
+ }
+
/* JSON Import */
- pair<char *, size_t> p = get_corpus(argv[argc - 1]);
+ pair<char *, size_t> p = get_corpus(json_path);
enum json_tokener_error jerr = json_tokener_success;
char * buffer = (char *) malloc(p.second);
@@ -161,7 +208,7 @@ int main(int argc, char *argv[]) {
total_time_usec = 0;
- for (int r = 0; r < repeat; r++) {
+ for (int r = 0; r < enc_repeat; r++) {
gettimeofday(&t1, NULL);
@@ -175,13 +222,25 @@ int main(int argc, char *argv[]) {
json_to_tlv(f, &serializer[0]);
}
+ if (zmq_sock) {
+ for(i = 0; i < num_elements; i++) {
+ struct zmq_msg_hdr msg_hdr;
+ strncpy(msg_hdr.url, "flow", sizeof(msg_hdr.url));
+ msg_hdr.version = 3;
+ msg_hdr.size = serializer[i].size_used;
+ zmq_send(zmq_sock, &msg_hdr, sizeof(msg_hdr), ZMQ_SNDMORE);
+ rc = zmq_send(zmq_sock, serializer[i].buffer, msg_hdr.size, 0);
+ if (rc > 0)
+ exported_flows++;
+ }
+ }
+
gettimeofday(&t2, NULL);
total_time_usec += (u_int64_t) ((u_int64_t) t2.tv_sec * 1000000 + t2.tv_usec) - ((u_int64_t) t1.tv_sec * 1000000 + t1.tv_usec);
-
}
- printf("Serialization perf (includes json-c overhead): %.3f msec total time for %u iterations\n", (double) total_time_usec/1000, repeat);
+ printf("Serialization perf (includes json-c overhead): %.3f msec total time for %u iterations\n", (double) total_time_usec/1000, enc_repeat);
json_object_put(f);
@@ -189,7 +248,7 @@ int main(int argc, char *argv[]) {
total_time_usec = 0;
- for (int r = 0; r < repeat; r++) {
+ for (int r = 0; r < dec_repeat; r++) {
gettimeofday(&t1, NULL);
@@ -238,25 +297,33 @@ int main(int argc, char *argv[]) {
break;
default:
- goto exit;
+ goto close_record;
break;
}
}
- exit:
- if (verbose) printf("\n");
- }
+ close_record:
+
+ if (verbose) printf("\n");
+ }
gettimeofday(&t2, NULL);
total_time_usec += (u_int64_t) ((u_int64_t) t2.tv_sec * 1000000 + t2.tv_usec) - ((u_int64_t) t1.tv_sec * 1000000 + t1.tv_usec);
}
- printf("Deserialization perf: %.3f msec total time for %u iterations\n", (double) total_time_usec/1000, repeat);
+ printf("Deserialization perf: %.3f msec total time for %u iterations\n", (double) total_time_usec/1000, dec_repeat);
+
+ if (zmq_sock)
+ printf("%u total flows exported over ZMQ\n", exported_flows);
for (i = 0; i < num_elements; i++)
ndpi_term_serializer(&serializer[i]);
+ if (zmq_sock != NULL) zmq_close(zmq_sock);
+ if (zmq_context != NULL) zmq_ctx_destroy(zmq_context);
+ if (zmq_endpoint) free(zmq_endpoint);
+
return 0;
}