diff options
Diffstat (limited to 'examples/py-machine-learning')
-rwxr-xr-x | examples/py-machine-learning/keras-autoencoder.py | 384 | ||||
-rw-r--r-- | examples/py-machine-learning/requirements.txt | 7 | ||||
-rwxr-xr-x | examples/py-machine-learning/sklearn-random-forest.py | 352 |
3 files changed, 743 insertions, 0 deletions
diff --git a/examples/py-machine-learning/keras-autoencoder.py b/examples/py-machine-learning/keras-autoencoder.py new file mode 100755 index 000000000..a99cc1b2d --- /dev/null +++ b/examples/py-machine-learning/keras-autoencoder.py @@ -0,0 +1,384 @@ +#!/usr/bin/env python3 + +import base64 +import binascii +import datetime as dt +import math +import matplotlib.animation as ani +import matplotlib.pyplot as plt +import multiprocessing as mp +import numpy as np +import os +import queue +import sys + +import tensorflow as tf +from tensorflow.keras import models, layers, preprocessing +from tensorflow.keras.layers import Embedding, Masking, Input, Dense +from tensorflow.keras.models import Model +from tensorflow.keras.utils import plot_model +from tensorflow.keras.losses import MeanSquaredError, KLDivergence +from tensorflow.keras.optimizers import Adam, SGD +from tensorflow.keras.callbacks import TensorBoard, EarlyStopping + +sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies') +sys.path.append(os.path.dirname(sys.argv[0]) + '/../share/nDPId') +sys.path.append(os.path.dirname(sys.argv[0])) +sys.path.append(sys.base_prefix + '/share/nDPId') +import nDPIsrvd +from nDPIsrvd import nDPIsrvdSocket, TermColor + +INPUT_SIZE = nDPIsrvd.nDPId_PACKETS_PLEN_MAX +LATENT_SIZE = 8 +TRAINING_SIZE = 512 +EPOCH_COUNT = 3 +BATCH_SIZE = 16 +LEARNING_RATE = 0.000001 +ES_PATIENCE = 3 +PLOT = False +PLOT_HISTORY = 100 +TENSORBOARD = False +TB_LOGPATH = 'logs/' + dt.datetime.now().strftime("%Y%m%d-%H%M%S") +VAE_USE_KLDIV = False +VAE_USE_SGD = False + +def generate_autoencoder(): + # TODO: The current model does handle *each* packet separatly. + # But in fact, depending on the nDPId settings (nDPId_PACKETS_PER_FLOW_TO_SEND), packets can be in relation to each other. + # The accuracy may (or may not) improve significantly, but some of changes in the code are required. + input_i = Input(shape=(), name='input_i') + input_e = Embedding(input_dim=INPUT_SIZE, output_dim=INPUT_SIZE, mask_zero=True, name='input_e')(input_i) + masked_e = Masking(mask_value=0.0, name='masked_e')(input_e) + encoded_h1 = Dense(4096, activation='relu', name='encoded_h1')(masked_e) + encoded_h2 = Dense(2048, activation='relu', name='encoded_h2')(encoded_h1) + encoded_h3 = Dense(1024, activation='relu', name='encoded_h3')(encoded_h2) + encoded_h4 = Dense(512, activation='relu', name='encoded_h4')(encoded_h3) + encoded_h5 = Dense(128, activation='relu', name='encoded_h5')(encoded_h4) + encoded_h6 = Dense(64, activation='relu', name='encoded_h6')(encoded_h5) + encoded_h7 = Dense(32, activation='relu', name='encoded_h7')(encoded_h6) + latent = Dense(LATENT_SIZE, activation='relu', name='latent')(encoded_h7) + + input_l = Input(shape=(LATENT_SIZE), name='input_l') + decoder_h1 = Dense(32, activation='relu', name='decoder_h1')(input_l) + decoder_h2 = Dense(64, activation='relu', name='decoder_h2')(decoder_h1) + decoder_h3 = Dense(128, activation='relu', name='decoder_h3')(decoder_h2) + decoder_h4 = Dense(512, activation='relu', name='decoder_h4')(decoder_h3) + decoder_h5 = Dense(1024, activation='relu', name='decoder_h5')(decoder_h4) + decoder_h6 = Dense(2048, activation='relu', name='decoder_h6')(decoder_h5) + decoder_h7 = Dense(4096, activation='relu', name='decoder_h7')(decoder_h6) + output_i = Dense(INPUT_SIZE, activation='sigmoid', name='output_i')(decoder_h7) + + encoder = Model(input_e, latent, name='encoder') + decoder = Model(input_l, output_i, name='decoder') + return KLDivergence() if VAE_USE_KLDIV else MeanSquaredError(), \ + SGD() if VAE_USE_SGD else Adam(learning_rate=LEARNING_RATE), \ + Model(input_e, decoder(encoder(input_e)), name='VAE') + +def compile_autoencoder(): + loss, optimizer, autoencoder = generate_autoencoder() + autoencoder.compile(loss=loss, optimizer=optimizer, metrics=[]) + return autoencoder + +def get_autoencoder(load_from_file=None): + if load_from_file is None: + autoencoder = compile_autoencoder() + else: + autoencoder = models.load_model(load_from_file) + + encoder_submodel = autoencoder.layers[1] + decoder_submodel = autoencoder.layers[2] + return encoder_submodel, decoder_submodel, autoencoder + +def on_json_line(json_dict, instance, current_flow, global_user_data): + if 'packet_event_name' not in json_dict: + return True + + if json_dict['packet_event_name'] != 'packet' and \ + json_dict['packet_event_name'] != 'packet-flow': + return True + + shutdown_event, training_event, padded_pkts, print_dots = global_user_data + if shutdown_event.is_set(): + return False + + try: + buf = base64.b64decode(json_dict['pkt'], validate=True) + except binascii.Error as err: + sys.stderr.write('\nBase64 Exception: {}\n'.format(str(err))) + sys.stderr.write('Affected JSON: {}\n'.format(str(json_dict))) + sys.stderr.flush() + return False + + # Generate decimal byte buffer with valus from 0-255 + int_buf = [] + for v in buf: + int_buf.append(int(v)) + + mat = np.array([int_buf], dtype='float64') + + # Normalize the values + mat = mat.astype('float64') / 255.0 + + # Mean removal + matmean = np.mean(mat, dtype='float64') + mat -= matmean + + # Pad resulting matrice + buf = preprocessing.sequence.pad_sequences(mat, padding="post", maxlen=INPUT_SIZE, truncating='post', dtype='float64') + padded_pkts.put(buf[0]) + + #print(list(buf[0])) + + if not training_event.is_set(): + sys.stdout.write('.' * print_dots) + sys.stdout.flush() + print_dots = 1 + else: + print_dots += 1 + + return True + +def ndpisrvd_worker(address, shared_shutdown_event, shared_training_event, shared_packet_list): + nsock = nDPIsrvdSocket() + + try: + nsock.connect(address) + print_dots = 1 + nsock.loop(on_json_line, None, (shared_shutdown_event, shared_training_event, shared_packet_list, print_dots)) + except nDPIsrvd.SocketConnectionBroken as err: + sys.stderr.write('\nnDPIsrvd-Worker Socket Error: {}\n'.format(err)) + except KeyboardInterrupt: + sys.stderr.write('\n') + except Exception as err: + sys.stderr.write('\nnDPIsrvd-Worker Exception: {}\n'.format(str(err))) + sys.stderr.flush() + + shared_shutdown_event.set() + +def keras_worker(load_model, save_model, shared_shutdown_event, shared_training_event, shared_packet_queue, shared_plot_queue): + shared_training_event.set() + try: + encoder, _, autoencoder = get_autoencoder(load_model) + except Exception as err: + sys.stderr.write('Could not load Keras model from file: {}\n'.format(str(err))) + sys.stderr.flush() + encoder, _, autoencoder = get_autoencoder() + autoencoder.summary() + additional_callbacks = [] + if TENSORBOARD is True: + tensorboard = TensorBoard(log_dir=TB_LOGPATH, histogram_freq=1) + additional_callbacks += [tensorboard] + early_stopping = EarlyStopping(monitor='val_loss', min_delta=0.0001, patience=ES_PATIENCE, restore_best_weights=True, start_from_epoch=0, verbose=0, mode='auto') + additional_callbacks += [early_stopping] + shared_training_event.clear() + + try: + packets = list() + while not shared_shutdown_event.is_set(): + try: + packet = shared_packet_queue.get(timeout=1) + except queue.Empty: + packet = None + + if packet is None: + continue + + packets.append(packet) + if len(packets) % TRAINING_SIZE == 0: + shared_training_event.set() + print('\nGot {} packets, training..'.format(len(packets))) + tmp = np.array(packets) + history = autoencoder.fit( + tmp, tmp, epochs=EPOCH_COUNT, batch_size=BATCH_SIZE, + validation_split=0.2, + shuffle=True, + callbacks=[additional_callbacks] + ) + reconstructed_data = autoencoder.predict(tmp) + mse = np.mean(np.square(tmp - reconstructed_data)) + reconstruction_accuracy = (1.0 / mse) + encoded_data = encoder.predict(tmp) + latent_activations = encoder.predict(tmp) + shared_plot_queue.put((reconstruction_accuracy, history.history['val_loss'], encoded_data[:, 0], encoded_data[:, 1], latent_activations)) + packets.clear() + shared_training_event.clear() + except KeyboardInterrupt: + sys.stderr.write('\n') + except Exception as err: + if len(str(err)) == 0: + err = 'Unknown' + sys.stderr.write('\nKeras-Worker Exception: {}\n'.format(str(err))) + sys.stderr.flush() + + if save_model is not None: + sys.stderr.write('Saving model to {}\n'.format(save_model)) + sys.stderr.flush() + autoencoder.save(save_model) + + try: + shared_shutdown_event.set() + except Exception: + pass + +def plot_animate(i, shared_plot_queue, ax, xs, ys): + if not shared_plot_queue.empty(): + accuracy, loss, encoded_data0, encoded_data1, latent_activations = shared_plot_queue.get(timeout=1) + epochs = len(loss) + loss_mean = sum(loss) / epochs + else: + return + + (ax1, ax2, ax3, ax4) = ax + (ys1, ys2, ys3, ys4) = ys + + if len(xs) == 0: + xs.append(epochs) + else: + xs.append(xs[-1] + epochs) + ys1.append(accuracy) + ys2.append(loss_mean) + + xs = xs[-PLOT_HISTORY:] + ys1 = ys1[-PLOT_HISTORY:] + ys2 = ys2[-PLOT_HISTORY:] + + ax1.clear() + ax1.plot(xs, ys1, '-') + ax2.clear() + ax2.plot(xs, ys2, '-') + ax3.clear() + ax3.scatter(encoded_data0, encoded_data1, marker='.') + ax4.clear() + ax4.imshow(latent_activations, cmap='viridis', aspect='auto') + + ax1.set_xlabel('Epoch Count') + ax1.set_ylabel('Accuracy') + ax2.set_xlabel('Epoch Count') + ax2.set_ylabel('Validation Loss') + ax3.set_title('Latent Space') + ax4.set_title('Latent Space Heatmap') + ax4.set_xlabel('Latent Dimensions') + ax4.set_ylabel('Datapoints') + +def plot_worker(shared_shutdown_event, shared_plot_queue): + try: + fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2) + fig.tight_layout() + ax1.set_xlabel('Epoch Count') + ax1.set_ylabel('Accuracy') + ax2.set_xlabel('Epoch Count') + ax2.set_ylabel('Validation Loss') + ax3.set_title('Latent Space') + ax4.set_title('Latent Space Heatmap') + ax4.set_xlabel('Latent Dimensions') + ax4.set_ylabel('Datapoints') + xs = [] + ys1 = [] + ys2 = [] + ys3 = [] + ys4 = [] + ani.FuncAnimation(fig, plot_animate, fargs=(shared_plot_queue, (ax1, ax2, ax3, ax4), xs, (ys1, ys2, ys3, ys4)), interval=1000, cache_frame_data=False) + plt.subplots_adjust(left=0.05, right=0.95, top=0.95, bottom=0.05) + plt.margins(x=0, y=0) + plt.show() + except Exception as err: + sys.stderr.write('\nPlot-Worker Exception: {}\n'.format(str(err))) + sys.stderr.flush() + shared_shutdown_event.set() + return + +if __name__ == '__main__': + sys.stderr.write('\b\n***************\n') + sys.stderr.write('*** WARNING ***\n') + sys.stderr.write('***************\n') + sys.stderr.write('\nThis is an unmature Autoencoder example.\n') + sys.stderr.write('Please do not rely on any of it\'s output!\n\n') + + argparser = nDPIsrvd.defaultArgumentParser() + argparser.add_argument('--load-model', action='store', + help='Load a pre-trained model file.') + argparser.add_argument('--save-model', action='store', + help='Save the trained model to a file.') + argparser.add_argument('--training-size', action='store', type=int, default=TRAINING_SIZE, + help='Set the amount of captured packets required to start the training phase.') + argparser.add_argument('--batch-size', action='store', type=int, default=BATCH_SIZE, + help='Set the batch size used for the training phase.') + argparser.add_argument('--learning-rate', action='store', type=float, default=LEARNING_RATE, + help='Set the (initial) learning rate for the optimizer.') + argparser.add_argument('--plot', action='store_true', default=PLOT, + help='Show some model metrics using pyplot.') + argparser.add_argument('--plot-history', action='store', type=int, default=PLOT_HISTORY, + help='Set the history size of Line plots. Requires --plot') + argparser.add_argument('--tensorboard', action='store_true', default=TENSORBOARD, + help='Enable TensorBoard compatible logging callback.') + argparser.add_argument('--tensorboard-logpath', action='store', default=TB_LOGPATH, + help='TensorBoard logging path.') + argparser.add_argument('--use-sgd', action='store_true', default=VAE_USE_SGD, + help='Use SGD optimizer instead of Adam.') + argparser.add_argument('--use-kldiv', action='store_true', default=VAE_USE_KLDIV, + help='Use Kullback-Leibler loss function instead of Mean-Squared-Error.') + argparser.add_argument('--patience', action='store', type=int, default=ES_PATIENCE, + help='Epoch value for EarlyStopping. This value forces VAE fitting to if no improvment achieved.') + args = argparser.parse_args() + address = nDPIsrvd.validateAddress(args) + + LEARNING_RATE = args.learning_rate + TRAINING_SIZE = args.training_size + BATCH_SIZE = args.batch_size + PLOT = args.plot + PLOT_HISTORY = args.plot_history + TENSORBOARD = args.tensorboard + TB_LOGPATH = args.tensorboard_logpath if args.tensorboard_logpath is not None else TB_LOGPATH + VAE_USE_SGD = args.use_sgd + VAE_USE_KLDIV = args.use_kldiv + ES_PATIENCE = args.patience + + sys.stderr.write('Recv buffer size: {}\n'.format(nDPIsrvd.NETWORK_BUFFER_MAX_SIZE)) + sys.stderr.write('Connecting to {} ..\n'.format(address[0]+':'+str(address[1]) if type(address) is tuple else address)) + sys.stderr.write('PLOT={}, PLOT_HISTORY={}, LEARNING_RATE={}, TRAINING_SIZE={}, BATCH_SIZE={}\n\n'.format(PLOT, PLOT_HISTORY, LEARNING_RATE, TRAINING_SIZE, BATCH_SIZE)) + + mgr = mp.Manager() + + shared_training_event = mgr.Event() + shared_training_event.clear() + + shared_shutdown_event = mgr.Event() + shared_shutdown_event.clear() + + shared_packet_queue = mgr.JoinableQueue() + shared_plot_queue = mgr.JoinableQueue() + + nDPIsrvd_job = mp.Process(target=ndpisrvd_worker, args=( + address, + shared_shutdown_event, + shared_training_event, + shared_packet_queue + )) + nDPIsrvd_job.start() + + keras_job = mp.Process(target=keras_worker, args=( + args.load_model, + args.save_model, + shared_shutdown_event, + shared_training_event, + shared_packet_queue, + shared_plot_queue + )) + keras_job.start() + + if PLOT is True: + plot_job = mp.Process(target=plot_worker, args=(shared_shutdown_event, shared_plot_queue)) + plot_job.start() + + try: + shared_shutdown_event.wait() + except KeyboardInterrupt: + print('\nShutting down worker processess..') + + if PLOT is True: + plot_job.terminate() + plot_job.join() + nDPIsrvd_job.terminate() + nDPIsrvd_job.join() + keras_job.join(timeout=3) + keras_job.terminate() diff --git a/examples/py-machine-learning/requirements.txt b/examples/py-machine-learning/requirements.txt new file mode 100644 index 000000000..33cfad38c --- /dev/null +++ b/examples/py-machine-learning/requirements.txt @@ -0,0 +1,7 @@ +joblib +tensorflow +scikit-learn +scipy +matplotlib +numpy +pandas diff --git a/examples/py-machine-learning/sklearn-random-forest.py b/examples/py-machine-learning/sklearn-random-forest.py new file mode 100755 index 000000000..07f4049d8 --- /dev/null +++ b/examples/py-machine-learning/sklearn-random-forest.py @@ -0,0 +1,352 @@ +#!/usr/bin/env python3 + +import csv +import joblib +import matplotlib.pyplot +import numpy +import os +import pandas +import sklearn +import sklearn.ensemble +import sklearn.inspection +import sys +import time + +sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies') +sys.path.append(os.path.dirname(sys.argv[0]) + '/../share/nDPId') +sys.path.append(os.path.dirname(sys.argv[0])) +sys.path.append(sys.base_prefix + '/share/nDPId') +import nDPIsrvd +from nDPIsrvd import nDPIsrvdSocket, TermColor + + +N_DIRS = 0 +N_BINS = 0 + +ENABLE_FEATURE_IAT = False +ENABLE_FEATURE_PKTLEN = False +ENABLE_FEATURE_DIRS = True +ENABLE_FEATURE_BINS = True + +PROTO_CLASSES = None + +def getFeatures(json): + return [json['flow_src_packets_processed'], + json['flow_dst_packets_processed'], + json['flow_src_tot_l4_payload_len'], + json['flow_dst_tot_l4_payload_len']] + +def getFeaturesFromArray(json, expected_len=0): + if type(json) is str: + dirs = numpy.fromstring(json, sep=',', dtype=int) + dirs = numpy.asarray(dirs, dtype=int).tolist() + elif type(json) is list: + dirs = json + else: + raise TypeError('Invalid type: {}.'.format(type(json))) + + if expected_len > 0 and len(dirs) != expected_len: + raise RuntimeError('Invalid array length; Expected {}, Got {}.'.format(expected_len, len(dirs))) + + return dirs + +def getRelevantFeaturesCSV(line): + ret = list() + ret.extend(getFeatures(line)); + if ENABLE_FEATURE_IAT is True: + ret.extend(getFeaturesFromArray(line['iat_data'], N_DIRS - 1)) + if ENABLE_FEATURE_PKTLEN is True: + ret.extend(getFeaturesFromArray(line['pktlen_data'], N_DIRS)) + if ENABLE_FEATURE_DIRS is True: + ret.extend(getFeaturesFromArray(line['directions'], N_DIRS)) + if ENABLE_FEATURE_BINS is True: + ret.extend(getFeaturesFromArray(line['bins_c_to_s'], N_BINS)) + ret.extend(getFeaturesFromArray(line['bins_s_to_c'], N_BINS)) + return [ret] + +def getRelevantFeaturesJSON(line): + ret = list() + ret.extend(getFeatures(line)) + if ENABLE_FEATURE_IAT is True: + ret.extend(getFeaturesFromArray(line['data_analysis']['iat']['data'], N_DIRS - 1)) + if ENABLE_FEATURE_PKTLEN is True: + ret.extend(getFeaturesFromArray(line['data_analysis']['pktlen']['data'], N_DIRS)) + if ENABLE_FEATURE_DIRS is True: + ret.extend(getFeaturesFromArray(line['data_analysis']['directions'], N_DIRS)) + if ENABLE_FEATURE_BINS is True: + ret.extend(getFeaturesFromArray(line['data_analysis']['bins']['c_to_s'], N_BINS)) + ret.extend(getFeaturesFromArray(line['data_analysis']['bins']['s_to_c'], N_BINS) ) + return [ret] + +def getRelevantFeatureNames(): + names = list() + names.extend(['flow_src_packets_processed', 'flow_dst_packets_processed', + 'flow_src_tot_l4_payload_len', 'flow_dst_tot_l4_payload_len']) + if ENABLE_FEATURE_IAT is True: + for x in range(N_DIRS - 1): + names.append('iat_{}'.format(x)) + if ENABLE_FEATURE_PKTLEN is True: + for x in range(N_DIRS): + names.append('pktlen_{}'.format(x)) + if ENABLE_FEATURE_DIRS is True: + for x in range(N_DIRS): + names.append('dirs_{}'.format(x)) + if ENABLE_FEATURE_BINS is True: + for x in range(N_BINS): + names.append('bins_c_to_s_{}'.format(x)) + for x in range(N_BINS): + names.append('bins_s_to_c_{}'.format(x)) + return names + +def plotPermutatedImportance(model, X, y): + result = sklearn.inspection.permutation_importance(model, X, y, n_repeats=10, random_state=42, n_jobs=-1) + forest_importances = pandas.Series(result.importances_mean, index=getRelevantFeatureNames()) + + fig, ax = matplotlib.pyplot.subplots() + forest_importances.plot.bar(yerr=result.importances_std, ax=ax) + ax.set_title("Feature importances using permutation on full model") + ax.set_ylabel("Mean accuracy decrease") + fig.tight_layout() + matplotlib.pyplot.show() + +def isProtoClass(proto_class, line): + if type(proto_class) != list or type(line) != str: + raise TypeError('Invalid type: {}/{}.'.format(type(proto_class), type(line))) + + s = line.lower() + + for x in range(len(proto_class)): + if s.startswith(proto_class[x].lower()) is True: + return x + 1 + + return 0 + +def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): + if 'flow_event_name' not in json_dict: + return True + if json_dict['flow_event_name'] != 'analyse': + return True + + if 'ndpi' not in json_dict: + return True + if 'proto' not in json_dict['ndpi']: + return True + + #print(json_dict) + + model, proto_class, disable_colors = global_user_data + + try: + X = getRelevantFeaturesJSON(json_dict) + y = model.predict(X) + p = model.predict_log_proba(X) + + if y[0] <= 0: + y_text = 'n/a' + else: + y_text = proto_class[y[0] - 1] + + color_start = '' + color_end = '' + pred_failed = False + if disable_colors is False: + if json_dict['ndpi']['proto'].lower().startswith(y_text) is True: + color_start = TermColor.BOLD + color_end = TermColor.END + elif y_text not in proto_class and \ + json_dict['ndpi']['proto'].lower() not in proto_class: + pass + else: + pred_failed = True + color_start = TermColor.WARNING + TermColor.BOLD + color_end = TermColor.END + + probs = str() + for i in range(len(p[0])): + if json_dict['ndpi']['proto'].lower().startswith(proto_class[i - 1]) and disable_colors is False: + probs += '{}{:>2.1f}{}, '.format(TermColor.BOLD + TermColor.BLINK if pred_failed is True else '', + p[0][i], TermColor.END) + elif i == y[0]: + probs += '{}{:>2.1f}{}, '.format(color_start, p[0][i], color_end) + else: + probs += '{:>2.1f}, '.format(p[0][i]) + probs = probs[:-2] + + print('DPI Engine detected: {}{:>24}{}, Predicted: {}{:>24}{}, Probabilities: {}'.format( + color_start, json_dict['ndpi']['proto'].lower(), color_end, + color_start, y_text, color_end, probs)) + + if pred_failed is True: + pclass = isProtoClass(args.proto_class, json_dict['ndpi']['proto'].lower()) + if pclass == 0: + msg = 'false positive' + else: + msg = 'false negative' + + print('{:>46} {}{}{}'.format('[-]', TermColor.FAIL + TermColor.BOLD + TermColor.BLINK, msg, TermColor.END)) + + except Exception as err: + print('Got exception `{}\'\nfor json: {}'.format(err, json_dict)) + + return True + +if __name__ == '__main__': + argparser = nDPIsrvd.defaultArgumentParser() + argparser.add_argument('--load-model', action='store', + help='Load a pre-trained model file.') + argparser.add_argument('--save-model', action='store', + help='Save the trained model to a file.') + argparser.add_argument('--csv', action='store', + help='Input CSV file generated with nDPIsrvd-analysed.') + argparser.add_argument('--proto-class', action='append', required=False, + help='nDPId protocol class of interest used for training and prediction. ' + + 'Can be specified multiple times. Example: tls.youtube') + argparser.add_argument('--generate-feature-importance', action='store_true', + help='Generates the permutated feature importance with matplotlib.') + argparser.add_argument('--enable-iat', action='store_true', default=None, + help='Enable packet (I)nter (A)rrival (T)ime for learning and prediction.') + argparser.add_argument('--enable-pktlen', action='store_true', default=None, + help='Enable layer 4 packet lengths for learning and prediction.') + argparser.add_argument('--disable-dirs', action='store_true', default=None, + help='Disable packet directions for learning and prediction.') + argparser.add_argument('--disable-bins', action='store_true', default=None, + help='Disable packet length distribution for learning and prediction.') + argparser.add_argument('--disable-colors', action='store_true', default=False, + help='Disable any coloring.') + argparser.add_argument('--sklearn-jobs', action='store', type=int, default=1, + help='Number of sklearn processes during training.') + argparser.add_argument('--sklearn-estimators', action='store', type=int, default=1000, + help='Number of trees in the forest.') + argparser.add_argument('--sklearn-min-samples-leaf', action='store', type=int, default=0.0001, + help='The minimum number of samples required to be at a leaf node.') + argparser.add_argument('--sklearn-class-weight', default='balanced', const='balanced', nargs='?', + choices=['balanced', 'balanced_subsample'], + help='Weights associated with the protocol classes.') + argparser.add_argument('--sklearn-max-features', default='sqrt', const='sqrt', nargs='?', + choices=['sqrt', 'log2'], + help='The number of features to consider when looking for the best split.') + argparser.add_argument('--sklearn-max-depth', action='store', type=int, default=128, + help='The maximum depth of a tree.') + argparser.add_argument('--sklearn-verbosity', action='store', type=int, default=0, + help='Controls the verbosity of sklearn\'s random forest classifier.') + args = argparser.parse_args() + address = nDPIsrvd.validateAddress(args) + + if args.csv is None and args.load_model is None: + sys.stderr.write('{}: Either `--csv` or `--load-model` required!\n'.format(sys.argv[0])) + sys.exit(1) + + if args.csv is None and args.generate_feature_importance is True: + sys.stderr.write('{}: `--generate-feature-importance` requires `--csv`.\n'.format(sys.argv[0])) + sys.exit(1) + + if args.proto_class is None or len(args.proto_class) == 0: + if args.csv is None and args.load_model is None: + sys.stderr.write('{}: `--proto-class` missing, no useful classification can be performed.\n'.format(sys.argv[0])) + else: + if args.load_model is not None: + sys.stderr.write('{}: `--proto-class` set, but you want to load an existing model.\n'.format(sys.argv[0])) + sys.exit(1) + + if args.load_model is not None: + sys.stderr.write('{}: You are loading an existing model file. ' \ + 'Some --sklearn-* command line parameters won\'t have any effect!\n'.format(sys.argv[0])) + + if args.enable_iat is not None: + sys.stderr.write('{}: `--enable-iat` set, but you want to load an existing model.\n'.format(sys.argv[0])) + sys.exit(1) + if args.enable_pktlen is not None: + sys.stderr.write('{}: `--enable-pktlen` set, but you want to load an existing model.\n'.format(sys.argv[0])) + sys.exit(1) + if args.disable_dirs is not None: + sys.stderr.write('{}: `--disable-dirs` set, but you want to load an existing model.\n'.format(sys.argv[0])) + sys.exit(1) + if args.disable_bins is not None: + sys.stderr.write('{}: `--disable-bins` set, but you want to load an existing model.\n'.format(sys.argv[0])) + sys.exit(1) + + ENABLE_FEATURE_IAT = args.enable_iat if args.enable_iat is not None else ENABLE_FEATURE_IAT + ENABLE_FEATURE_PKTLEN = args.enable_pktlen if args.enable_pktlen is not None else ENABLE_FEATURE_PKTLEN + ENABLE_FEATURE_DIRS = args.disable_dirs if args.disable_dirs is not None else ENABLE_FEATURE_DIRS + ENABLE_FEATURE_BINS = args.disable_bins if args.disable_bins is not None else ENABLE_FEATURE_BINS + PROTO_CLASSES = args.proto_class + + numpy.set_printoptions(formatter={'float_kind': "{:.1f}".format}, sign=' ') + numpy.seterr(divide = 'ignore') + + if args.proto_class is not None: + for i in range(len(args.proto_class)): + args.proto_class[i] = args.proto_class[i].lower() + + if args.load_model is not None: + sys.stderr.write('Loading model from {}\n'.format(args.load_model)) + model, options = joblib.load(args.load_model) + ENABLE_FEATURE_IAT, ENABLE_FEATURE_PKTLEN, ENABLE_FEATURE_DIRS, ENABLE_FEATURE_BINS, args.proto_class = options + + if args.csv is not None: + sys.stderr.write('Learning via CSV..\n') + with open(args.csv, newline='\n') as csvfile: + reader = csv.DictReader(csvfile, delimiter=',', quotechar='"') + X = list() + y = list() + + for line in reader: + N_DIRS = len(getFeaturesFromArray(line['directions'])) + N_BINS = len(getFeaturesFromArray(line['bins_c_to_s'])) + break + + for line in reader: + try: + X += getRelevantFeaturesCSV(line) + except RuntimeError as err: + print('Runtime Error: `{}\'\non line {}: {}'.format(err, reader.line_num - 1, line)) + continue + except TypeError as err: + print('Type Error: `{}\'\non line {}: {}'.format(err, reader.line_num - 1, line)) + continue + + try: + y += [isProtoClass(args.proto_class, line['proto'])] + except TypeError as err: + X.pop() + print('Type Error: `{}\'\non line {}: {}'.format(err, reader.line_num - 1, line)) + continue + + sys.stderr.write('CSV data set contains {} entries.\n'.format(len(X))) + + if args.load_model is None: + model = sklearn.ensemble.RandomForestClassifier(bootstrap=False, + class_weight = args.sklearn_class_weight, + n_jobs = args.sklearn_jobs, + n_estimators = args.sklearn_estimators, + verbose = args.sklearn_verbosity, + min_samples_leaf = args.sklearn_min_samples_leaf, + max_features = args.sklearn_max_features, + max_depth = args.sklearn_max_depth + ) + options = (ENABLE_FEATURE_IAT, ENABLE_FEATURE_PKTLEN, ENABLE_FEATURE_DIRS, ENABLE_FEATURE_BINS, args.proto_class) + sys.stderr.write('Training model..\n') + model.fit(X, y) + + if args.generate_feature_importance is True: + sys.stderr.write('Generating feature importance .. this may take some time\n') + plotPermutatedImportance(model, X, y) + + if args.save_model is not None: + sys.stderr.write('Saving model to {}\n'.format(args.save_model)) + joblib.dump([model, options], args.save_model) + + print('ENABLE_FEATURE_PKTLEN: {}'.format(ENABLE_FEATURE_PKTLEN)) + print('ENABLE_FEATURE_BINS..: {}'.format(ENABLE_FEATURE_BINS)) + print('ENABLE_FEATURE_DIRS..: {}'.format(ENABLE_FEATURE_DIRS)) + print('ENABLE_FEATURE_IAT...: {}'.format(ENABLE_FEATURE_IAT)) + print('Map[*] -> [0]') + for x in range(len(args.proto_class)): + print('Map["{}"] -> [{}]'.format(args.proto_class[x], x + 1)) + + sys.stderr.write('Predicting realtime traffic..\n') + sys.stderr.write('Recv buffer size: {}\n'.format(nDPIsrvd.NETWORK_BUFFER_MAX_SIZE)) + sys.stderr.write('Connecting to {} ..\n'.format(address[0]+':'+str(address[1]) if type(address) is tuple else address)) + nsock = nDPIsrvdSocket() + nsock.connect(address) + nsock.loop(onJsonLineRecvd, None, (model, args.proto_class, args.disable_colors)) |