aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2023-08-15 11:21:46 +0200
committerToni Uhlig <matzeton@googlemail.com>2023-08-15 11:21:46 +0200
commit4b3031245dcf78741c83664fee886825d73f1cb1 (patch)
tree3cdb51b82d5e00cb2f3a8f0aa9d2569a4347b693
parent2b881d56e7f9c56689888f8904291e5184526529 (diff)
keras-autoencoder.py: fixed invalid preprocessing of received base64 packet data
* split logic into seperate jobs; nDPIsrvd and Keras * nDPIsrvd: break event processing and re-run `epoll_wait()` after client disconnected Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
-rwxr-xr-xexamples/py-machine-learning/keras-autoencoder.py221
-rw-r--r--nDPIsrvd.c2
2 files changed, 159 insertions, 64 deletions
diff --git a/examples/py-machine-learning/keras-autoencoder.py b/examples/py-machine-learning/keras-autoencoder.py
index 2a115395d..bcb63f30a 100755
--- a/examples/py-machine-learning/keras-autoencoder.py
+++ b/examples/py-machine-learning/keras-autoencoder.py
@@ -1,12 +1,12 @@
#!/usr/bin/env python3
import base64
+import binascii
import joblib
-import csv
-import matplotlib.pyplot as plt
+import multiprocessing as mp
import numpy as np
import os
-import pandas as pd
+import queue
import sys
sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
@@ -17,29 +17,37 @@ import nDPIsrvd
from nDPIsrvd import nDPIsrvdSocket, TermColor
INPUT_SIZE = nDPIsrvd.nDPId_PACKETS_PLEN_MAX
+LATENT_SIZE = 8
TRAINING_SIZE = 500
+EPOCH_COUNT = 5
BATCH_SIZE = 10
def generate_autoencoder():
- input_i = Input(shape=())
- input_i = Embedding(input_dim=INPUT_SIZE, output_dim=INPUT_SIZE, mask_zero=True)(input_i)
- encoded_h1 = Dense(1024, activation='relu', name='input_i')(input_i)
- encoded_h2 = Dense(512, activation='relu', name='encoded_h1')(encoded_h1)
- encoded_h3 = Dense(128, activation='relu', name='encoded_h2')(encoded_h2)
- encoded_h4 = Dense(64, activation='relu', name='encoded_h3')(encoded_h3)
- encoded_h5 = Dense(32, activation='relu', name='encoded_h4')(encoded_h4)
- latent = Dense(2, activation='relu', name='encoded_h5')(encoded_h5)
- decoder_h1 = Dense(32, activation='relu', name='latent')(latent)
- decoder_h2 = Dense(64, activation='relu', name='decoder_h1')(decoder_h1)
- decoder_h3 = Dense(128, activation='relu', name='decoder_h2')(decoder_h2)
- decoder_h4 = Dense(512, activation='relu', name='decoder_h3')(decoder_h3)
- decoder_h5 = Dense(1024, activation='relu', name='decoder_h4')(decoder_h4)
- return input_i, Model(input_i, Dense(INPUT_SIZE, activation='sigmoid', name='decoder_h5')(decoder_h5))
+ input_i = Input(shape=(), name='input_i')
+ input_e = Embedding(input_dim=INPUT_SIZE, output_dim=INPUT_SIZE, mask_zero=True, name='input_e')(input_i)
+ encoded_h1 = Dense(1024, activation='relu', name='encoded_h1')(input_e)
+ encoded_h2 = Dense(512, activation='relu', name='encoded_h2')(encoded_h1)
+ encoded_h3 = Dense(128, activation='relu', name='encoded_h3')(encoded_h2)
+ encoded_h4 = Dense(64, activation='relu', name='encoded_h4')(encoded_h3)
+ encoded_h5 = Dense(32, activation='relu', name='encoded_h5')(encoded_h4)
+ latent = Dense(LATENT_SIZE, activation='relu', name='latent')(encoded_h5)
+
+ input_l = Input(shape=(LATENT_SIZE), name='input_l')
+ decoder_h1 = Dense(32, activation='relu', name='decoder_h1')(input_l)
+ decoder_h2 = Dense(64, activation='relu', name='decoder_h2')(decoder_h1)
+ decoder_h3 = Dense(128, activation='relu', name='decoder_h3')(decoder_h2)
+ decoder_h4 = Dense(512, activation='relu', name='decoder_h4')(decoder_h3)
+ decoder_h5 = Dense(1024, activation='relu', name='decoder_h5')(decoder_h4)
+ output_i = Dense(INPUT_SIZE, activation='sigmoid', name='output_i')(decoder_h5)
+
+ encoder = Model(input_e, latent, name='encoder')
+ decoder = Model(input_l, output_i, name='decoder')
+ return encoder, decoder, Model(input_e, decoder(encoder(input_e)), name='VAE')
def compile_autoencoder():
- inp, autoencoder = generate_autoencoder()
+ encoder, decoder, autoencoder = generate_autoencoder()
autoencoder.compile(loss='mse', optimizer='adam', metrics=[tf.keras.metrics.Accuracy()])
- return inp, autoencoder
+ return encoder, decoder, autoencoder
def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
if 'packet_event_name' not in json_dict:
@@ -49,50 +57,122 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
json_dict['packet_event_name'] != 'packet-flow':
return True
- _, padded_pkts = global_user_data
- buf = base64.b64decode(json_dict['pkt'], validate=True)
+ shutdown_event, training_event, padded_pkts = global_user_data
+ if shutdown_event.is_set():
+ return False
+
+ try:
+ buf = base64.b64decode(json_dict['pkt'], validate=True)
+ except binascii.Error as err:
+ sys.stderr.write('\nBase64 Exception: {}\n'.format(str(err)))
+ sys.stderr.write('Affected JSON: {}\n'.format(str(json_dict)))
+ sys.stderr.flush()
+ return False
# Generate decimal byte buffer with valus from 0-255
int_buf = []
for v in buf:
int_buf.append(int(v))
- mat = np.array([int_buf])
+ mat = np.array([int_buf], dtype='float64')
# Normalize the values
- mat = mat.astype('float32') / 255.
+ mat = mat.astype('float64') / 255.0
# Mean removal
- matmean = np.mean(mat, axis=0)
+ matmean = np.mean(mat, dtype='float64')
mat -= matmean
# Pad resulting matrice
- buf = preprocessing.sequence.pad_sequences(mat, padding="post", maxlen=INPUT_SIZE, truncating='post')
- padded_pkts.append(buf[0])
-
- sys.stdout.write('.')
- sys.stdout.flush()
- if (len(padded_pkts) % TRAINING_SIZE == 0):
- print('\nGot {} packets, training..'.format(len(padded_pkts)))
- tmp = np.array(padded_pkts)
- history = autoencoder.fit(
- tmp, tmp, epochs=10, batch_size=BATCH_SIZE,
- validation_split=0.2,
- shuffle=True
- )
- padded_pkts.clear()
-
- #plot_model(autoencoder, show_shapes=True, show_layer_names=True)
- #plt.plot(history.history['loss'])
- #plt.plot(history.history['val_loss'])
- #plt.title('model loss')
- #plt.xlabel('loss')
- #plt.ylabel('val_loss')
- #plt.legend(['loss', 'val_loss'], loc='upper left')
- #plt.show()
+ buf = preprocessing.sequence.pad_sequences(mat, padding="post", maxlen=INPUT_SIZE, truncating='post', dtype='float64')
+ padded_pkts.put(buf[0])
+
+ #print(list(buf[0]))
+
+ if not training_event.is_set():
+ sys.stdout.write('.')
+ sys.stdout.flush()
return True
+def nDPIsrvd_worker(address, shared_shutdown_event, shared_training_event, shared_packet_list):
+ nsock = nDPIsrvdSocket()
+
+ try:
+ nsock.connect(address)
+ padded_pkts = list()
+ nsock.loop(onJsonLineRecvd, None, (shared_shutdown_event, shared_training_event, shared_packet_list))
+ except nDPIsrvd.SocketConnectionBroken as err:
+ sys.stderr.write('\nnDPIsrvd-Worker Socket Error: {}\n'.format(err))
+ except KeyboardInterrupt:
+ sys.stderr.write('\n')
+ except Exception as err:
+ sys.stderr.write('\nnDPIsrvd-Worker Exception: {}\n'.format(str(err)))
+ sys.stderr.flush()
+
+ shared_shutdown_event.set()
+
+def keras_worker(load_model, save_model, shared_shutdown_event, shared_training_event, shared_packet_queue):
+ shared_training_event.set()
+ if load_model is not None:
+ sys.stderr.write('Loading model from {}\n'.format(load_model))
+ sys.stderr.flush()
+ try:
+ encoder, decoder, autoencoder = joblib.load(load_model)
+ except:
+ sys.stderr.write('Could not load model from {}\n'.format(load_model))
+ sys.stderr.write('Compiling new Autoencoder..\n')
+ sys.stderr.flush()
+ encoder, decoder, autoencoder = compile_autoencoder()
+ else:
+ encoder, decoder, autoencoder = compile_autoencoder()
+ decoder.summary()
+ encoder.summary()
+ autoencoder.summary()
+ shared_training_event.clear()
+
+ try:
+ packets = list()
+ while not shared_shutdown_event.is_set():
+ try:
+ packet = shared_packet_queue.get(timeout=1)
+ except queue.Empty:
+ packet = None
+
+ if packet is None:
+ continue
+
+ packets.append(packet)
+ if len(packets) % TRAINING_SIZE == 0:
+ shared_training_event.set()
+ print('\nGot {} packets, training..'.format(len(packets)))
+ tmp = np.array(packets)
+ x_test_encoded = encoder.predict(tmp, batch_size=BATCH_SIZE)
+ history = autoencoder.fit(
+ tmp, tmp, epochs=EPOCH_COUNT, batch_size=BATCH_SIZE,
+ validation_split=0.2,
+ shuffle=True
+ )
+ packets.clear()
+ shared_training_event.clear()
+ except KeyboardInterrupt:
+ sys.stderr.write('\n')
+ except Exception as err:
+ if len(str(err)) == 0:
+ err = 'Unknown'
+ sys.stderr.write('\nKeras-Worker Exception: {}\n'.format(str(err)))
+ sys.stderr.flush()
+
+ if save_model is not None:
+ sys.stderr.write('Saving model to {}\n'.format(save_model))
+ sys.stderr.flush()
+ joblib.dump([encoder, decoder, autoencoder], save_model)
+
+ try:
+ shared_shutdown_event.set()
+ except:
+ pass
+
if __name__ == '__main__':
sys.stderr.write('\b\n***************\n')
sys.stderr.write('*** WARNING ***\n')
@@ -125,23 +205,38 @@ if __name__ == '__main__':
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.utils import plot_model
- if args.load_model is not None:
- sys.stderr.write('Loading model from {}\n'.format(args.load_model))
- autoencoder, options = joblib.load(args.load_model)
- else:
- _, autoencoder = compile_autoencoder()
- autoencoder.summary()
+ mgr = mp.Manager()
+
+ shared_training_event = mgr.Event()
+ shared_training_event.clear()
+
+ shared_shutdown_event = mgr.Event()
+ shared_shutdown_event.clear()
+
+ shared_packet_queue = mgr.JoinableQueue()
+
+ nDPIsrvd_job = mp.Process(target=nDPIsrvd_worker, args=(
+ address,
+ shared_shutdown_event,
+ shared_training_event,
+ shared_packet_queue
+ ))
+ nDPIsrvd_job.start()
+
+ keras_job = mp.Process(target=keras_worker, args=(
+ args.load_model,
+ args.save_model,
+ shared_shutdown_event,
+ shared_training_event,
+ shared_packet_queue
+ ))
+ keras_job.start()
- nsock = nDPIsrvdSocket()
- nsock.connect(address)
try:
- padded_pkts = list()
- nsock.loop(onJsonLineRecvd, None, (autoencoder, padded_pkts))
- except nDPIsrvd.SocketConnectionBroken as err:
- sys.stderr.write('\n{}\n'.format(err))
+ shared_shutdown_event.wait()
except KeyboardInterrupt:
- print()
+ print('\nShutting down worker processess..')
- if args.save_model is not None:
- sys.stderr.write('Saving model to {}\n'.format(args.save_model))
- joblib.dump([autoencoder, None], args.save_model)
+ nDPIsrvd_job.terminate()
+ nDPIsrvd_job.join()
+ keras_job.join()
diff --git a/nDPIsrvd.c b/nDPIsrvd.c
index db0b60fd2..8ffeeaa12 100644
--- a/nDPIsrvd.c
+++ b/nDPIsrvd.c
@@ -1422,7 +1422,7 @@ static int mainloop(int epollfd)
{
logger(1, "Epoll event error: %s", (errno != 0 ? strerror(errno) : "unknown"));
}
- continue;
+ break;
}
if (events[i].data.fd == collector_un_sockfd || events[i].data.fd == distributor_un_sockfd ||