diff options
Diffstat (limited to 'example/serializer/json2tlv.cpp')
-rw-r--r-- | example/serializer/json2tlv.cpp | 99 |
1 files changed, 83 insertions, 16 deletions
diff --git a/example/serializer/json2tlv.cpp b/example/serializer/json2tlv.cpp index 2d7a4b0ef..8d43e4ec0 100644 --- a/example/serializer/json2tlv.cpp +++ b/example/serializer/json2tlv.cpp @@ -6,9 +6,16 @@ #include "json.h" #include "ndpi_main.h" +#include <zmq.h> using namespace std; +struct zmq_msg_hdr { + char url[32]; + u_int32_t version; + u_int32_t size; +}; + static pair<char *, size_t> get_corpus(string filename) { ifstream is(filename, ios::binary); @@ -25,7 +32,7 @@ static pair<char *, size_t> get_corpus(string filename) { return make_pair((char *)aligned_buffer, length); } - cerr << "JSON file not found or empty\n"; + cerr << "JSON file " << filename << "not found or empty\n"; exit(1); } @@ -93,23 +100,25 @@ void json_to_tlv(json_object * jobj, ndpi_serializer *serializer) { } void print_help(char *bin) { - cerr << "Usage: " << bin << " -v -i <JSON file>\n"; + cerr << "Usage: " << bin << " -i <JSON file> [-z <ZMQ endpoint>] [-E <num encoding loops] [-D <num decoding loop>] [-v]\n"; cerr << "Note: the JSON file should contain an array of records\n"; } int main(int argc, char *argv[]) { char *json_path = NULL; - int repeat = 1; + char* zmq_endpoint = NULL; + void *zmq_sock = NULL; + void *zmq_context = NULL; + int enc_repeat = 1, dec_repeat = 1; int verbose = 0; struct timeval t1, t2; uint64_t total_time_usec; ndpi_serializer *serializer; ndpi_serializer deserializer; - int i, num_elements; - int rc; + int rc, i, num_elements, exported_flows = 0; char c; - while ((c = getopt(argc, argv,"hi:v")) != '?') { + while ((c = getopt(argc, argv,"hi:vz:E:D:")) != '?') { if (c == (char) 255 || c == -1) break; switch(c) { @@ -125,6 +134,18 @@ int main(int argc, char *argv[]) { case 'v': verbose = 1; break; + + case 'z': + zmq_endpoint = strdup(optarg); + break; + + case 'E': + enc_repeat = atoi(optarg); + break; + + case 'D': + dec_repeat = atoi(optarg); + break; } } @@ -133,9 +154,35 @@ int main(int argc, char *argv[]) { exit(1); } + if (zmq_endpoint) { + zmq_context = zmq_ctx_new(); + if (zmq_context == NULL) { + printf("Unable to initialize ZMQ zmq_context"); + exit(1); + } + + zmq_sock = zmq_socket(zmq_context, ZMQ_PUB); + if (zmq_sock == NULL) { + printf("Unable to create ZMQ socket"); + exit(1); + } + + if (zmq_endpoint[strlen(zmq_endpoint) - 1] == 'c') { + /* Collector mode */ + if (zmq_connect(zmq_sock, zmq_endpoint) != 0) + printf("Unable to connect to ZMQ socket %s: %s\n", zmq_endpoint, strerror(errno)); + } else { + /* Probe mode */ + if (zmq_bind(zmq_sock, zmq_endpoint) != 0) { + printf("Unable to bind to ZMQ socket %s: %s\n", zmq_endpoint, strerror(errno)); + exit(1); + } + } + } + /* JSON Import */ - pair<char *, size_t> p = get_corpus(argv[argc - 1]); + pair<char *, size_t> p = get_corpus(json_path); enum json_tokener_error jerr = json_tokener_success; char * buffer = (char *) malloc(p.second); @@ -161,7 +208,7 @@ int main(int argc, char *argv[]) { total_time_usec = 0; - for (int r = 0; r < repeat; r++) { + for (int r = 0; r < enc_repeat; r++) { gettimeofday(&t1, NULL); @@ -175,13 +222,25 @@ int main(int argc, char *argv[]) { json_to_tlv(f, &serializer[0]); } + if (zmq_sock) { + for(i = 0; i < num_elements; i++) { + struct zmq_msg_hdr msg_hdr; + strncpy(msg_hdr.url, "flow", sizeof(msg_hdr.url)); + msg_hdr.version = 3; + msg_hdr.size = serializer[i].size_used; + zmq_send(zmq_sock, &msg_hdr, sizeof(msg_hdr), ZMQ_SNDMORE); + rc = zmq_send(zmq_sock, serializer[i].buffer, msg_hdr.size, 0); + if (rc > 0) + exported_flows++; + } + } + gettimeofday(&t2, NULL); total_time_usec += (u_int64_t) ((u_int64_t) t2.tv_sec * 1000000 + t2.tv_usec) - ((u_int64_t) t1.tv_sec * 1000000 + t1.tv_usec); - } - printf("Serialization perf (includes json-c overhead): %.3f msec total time for %u iterations\n", (double) total_time_usec/1000, repeat); + printf("Serialization perf (includes json-c overhead): %.3f msec total time for %u iterations\n", (double) total_time_usec/1000, enc_repeat); json_object_put(f); @@ -189,7 +248,7 @@ int main(int argc, char *argv[]) { total_time_usec = 0; - for (int r = 0; r < repeat; r++) { + for (int r = 0; r < dec_repeat; r++) { gettimeofday(&t1, NULL); @@ -238,25 +297,33 @@ int main(int argc, char *argv[]) { break; default: - goto exit; + goto close_record; break; } } - exit: - if (verbose) printf("\n"); - } + close_record: + + if (verbose) printf("\n"); + } gettimeofday(&t2, NULL); total_time_usec += (u_int64_t) ((u_int64_t) t2.tv_sec * 1000000 + t2.tv_usec) - ((u_int64_t) t1.tv_sec * 1000000 + t1.tv_usec); } - printf("Deserialization perf: %.3f msec total time for %u iterations\n", (double) total_time_usec/1000, repeat); + printf("Deserialization perf: %.3f msec total time for %u iterations\n", (double) total_time_usec/1000, dec_repeat); + + if (zmq_sock) + printf("%u total flows exported over ZMQ\n", exported_flows); for (i = 0; i < num_elements; i++) ndpi_term_serializer(&serializer[i]); + if (zmq_sock != NULL) zmq_close(zmq_sock); + if (zmq_context != NULL) zmq_ctx_destroy(zmq_context); + if (zmq_endpoint) free(zmq_endpoint); + return 0; } |