aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/build-freebsd.yml2
-rw-r--r--.github/workflows/build.yml45
-rw-r--r--CMakeLists.txt14
-rw-r--r--examples/README.md6
-rw-r--r--examples/rs-simple/Cargo.toml20
-rw-r--r--examples/rs-simple/src/main.rs590
6 files changed, 659 insertions, 18 deletions
diff --git a/.github/workflows/build-freebsd.yml b/.github/workflows/build-freebsd.yml
index 1dc5ac278..590d4156d 100644
--- a/.github/workflows/build-freebsd.yml
+++ b/.github/workflows/build-freebsd.yml
@@ -23,7 +23,7 @@ jobs:
- uses: actions/checkout@v4
- name: Test in FreeBSD
id: test
- uses: vmactions/freebsd-vm@v1
+ uses: vmactions/freebsd-vm@main
with:
usesh: true
prepare: |
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 3bf6b5c49..42c18d405 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -33,6 +33,7 @@ jobs:
os: "ubuntu-latest"
ndpi_build: "-DBUILD_NDPI=ON"
ndpid_examples: "-DBUILD_EXAMPLES=ON"
+ ndpid_rust_examples: "-DBUILD_RUST_EXAMPLES=ON"
ndpid_gcrypt: "-DNDPI_WITH_GCRYPT=OFF"
ndpid_zlib: "-DENABLE_ZLIB=ON"
ndpid_extras: ""
@@ -46,6 +47,7 @@ jobs:
os: "ubuntu-latest"
ndpi_build: "-DBUILD_NDPI=ON"
ndpid_examples: "-DBUILD_EXAMPLES=ON"
+ ndpid_rust_examples: ""
ndpid_gcrypt: "-DNDPI_WITH_GCRYPT=ON"
ndpid_zlib: "-DENABLE_ZLIB=ON"
ndpid_extras: "-DNDPI_WITH_MAXMINDDB=ON -DNDPI_WITH_PCRE=ON -DENABLE_MEMORY_PROFILING=ON"
@@ -59,6 +61,7 @@ jobs:
os: "ubuntu-latest"
ndpi_build: "-DBUILD_NDPI=ON"
ndpid_examples: "-DBUILD_EXAMPLES=ON"
+ ndpid_rust_examples: ""
ndpid_gcrypt: "-DNDPI_WITH_GCRYPT=OFF"
ndpid_zlib: "-DENABLE_ZLIB=OFF"
ndpid_extras: ""
@@ -72,6 +75,7 @@ jobs:
os: "ubuntu-latest"
ndpi_build: "-DBUILD_NDPI=ON"
ndpid_examples: "-DBUILD_EXAMPLES=ON"
+ ndpid_rust_examples: ""
ndpid_gcrypt: "-DNDPI_WITH_GCRYPT=OFF"
ndpid_zlib: "-DENABLE_ZLIB=ON"
ndpid_extras: ""
@@ -84,6 +88,7 @@ jobs:
os: "ubuntu-latest"
ndpi_build: "-DBUILD_NDPI=ON"
ndpid_examples: "-DBUILD_EXAMPLES=ON"
+ ndpid_rust_examples: ""
ndpid_gcrypt: "-DNDPI_WITH_GCRYPT=OFF"
ndpid_zlib: "-DENABLE_ZLIB=ON"
ndpid_extras: ""
@@ -93,9 +98,10 @@ jobs:
upload: false
ndpi_min_version: "4.14"
- compiler: "clang-12"
- os: "ubuntu-20.04"
+ os: "ubuntu-22.04"
ndpi_build: "-DBUILD_NDPI=ON"
ndpid_examples: "-DBUILD_EXAMPLES=ON"
+ ndpid_rust_examples: ""
ndpid_gcrypt: "-DNDPI_WITH_GCRYPT=OFF"
ndpid_zlib: "-DENABLE_ZLIB=ON"
ndpid_extras: ""
@@ -105,9 +111,10 @@ jobs:
upload: false
ndpi_min_version: "4.14"
- compiler: "gcc-10"
- os: "ubuntu-20.04"
+ os: "ubuntu-22.04"
ndpi_build: "-DBUILD_NDPI=ON"
ndpid_examples: "-DBUILD_EXAMPLES=ON"
+ ndpid_rust_examples: ""
ndpid_gcrypt: "-DNDPI_WITH_GCRYPT=OFF"
ndpid_zlib: "-DENABLE_ZLIB=OFF"
ndpid_extras: ""
@@ -116,10 +123,11 @@ jobs:
poll: "-DFORCE_POLL=ON"
upload: false
ndpi_min_version: "4.14"
- - compiler: "gcc-7"
- os: "ubuntu-20.04"
+ - compiler: "gcc-9"
+ os: "ubuntu-22.04"
ndpi_build: "-DBUILD_NDPI=ON"
ndpid_examples: "-DBUILD_EXAMPLES=ON"
+ ndpid_rust_examples: ""
ndpid_gcrypt: "-DNDPI_WITH_GCRYPT=OFF"
ndpid_zlib: "-DENABLE_ZLIB=ON"
ndpid_extras: ""
@@ -132,6 +140,7 @@ jobs:
os: "macOS-13"
ndpi_build: "-DBUILD_NDPI=OFF"
ndpid_examples: "-DBUILD_EXAMPLES=OFF"
+ ndpid_rust_examples: ""
ndpid_gcrypt: "-DNDPI_WITH_GCRYPT=OFF"
ndpid_zlib: "-DENABLE_ZLIB=ON"
ndpid_extras: ""
@@ -169,7 +178,7 @@ jobs:
- name: Install MacOS Prerequisites
if: startsWith(matrix.os, 'macOS')
run: |
- brew install coreutils flock automake make unzip cmake pkg-config git wget
+ brew install coreutils automake make unzip
wget 'https://www.tcpdump.org/release/libpcap-1.10.4.tar.gz'
tar -xzvf libpcap-1.10.4.tar.gz
cd libpcap-1.10.4
@@ -191,6 +200,10 @@ jobs:
sudo apt-get update
sudo apt-get install autoconf automake cmake libtool pkg-config gettext libjson-c-dev flex bison libpcap-dev zlib1g-dev libcurl4-openssl-dev libdbus-1-dev
sudo apt-get install ${{ matrix.compiler }} lcov iproute2
+ - name: Install Ubuntu Prerequisites (Rust/Cargo)
+ if: startsWith(matrix.os, 'ubuntu') && startsWith(matrix.ndpid_rust_examples, '-DBUILD_RUST_EXAMPLES=ON')
+ run: |
+ sudo apt-get install cargo
- name: Install Ubuntu Prerequisites (libgcrypt)
if: startsWith(matrix.os, 'ubuntu') && startsWith(matrix.ndpid_gcrypt, '-DNDPI_WITH_GCRYPT=ON')
run: |
@@ -204,7 +217,7 @@ jobs:
run: |
sudo apt-get install libmaxminddb-dev libpcre2-dev
- name: Install Ubuntu Prerequisites (libnl-genl-3-dev)
- if: endsWith(matrix.compiler, 'gcc-7') == false && startsWith(matrix.ndpi_build, '-DBUILD_NDPI=ON') && startsWith(matrix.coverage, '-DENABLE_COVERAGE=OFF') && startsWith(matrix.sanitizer, '-DENABLE_SANITIZER=ON') && startsWith(matrix.ndpid_gcrypt, '-DNDPI_WITH_GCRYPT=OFF') && startsWith(matrix.ndpid_zlib, '-DENABLE_ZLIB=ON')
+ if: startsWith(matrix.ndpi_build, '-DBUILD_NDPI=ON') && startsWith(matrix.coverage, '-DENABLE_COVERAGE=OFF') && startsWith(matrix.sanitizer, '-DENABLE_SANITIZER=ON') && startsWith(matrix.ndpid_gcrypt, '-DNDPI_WITH_GCRYPT=OFF') && startsWith(matrix.ndpid_zlib, '-DENABLE_ZLIB=ON')
run: |
sudo apt-get install libnl-genl-3-dev
- name: Checking Network Buffer Size
@@ -217,7 +230,7 @@ jobs:
cmake -S . -B build -DCMAKE_C_COMPILER="$CMAKE_C_COMPILER" -DCMAKE_C_FLAGS="$CMAKE_C_FLAGS" -DCMAKE_MODULE_LINKER_FLAGS="$CMAKE_MODULE_LINKER_FLAGS" -DCMAKE_C_EXE_LINKER_FLAGS="$CMAKE_C_EXE_LINKER_FLAGS" \
-DENABLE_DBUS=ON -DENABLE_CURL=ON -DENABLE_SYSTEMD=ON \
${{ matrix.poll }} ${{ matrix.coverage }} ${{ matrix.sanitizer }} ${{ matrix.ndpi_build }} \
- ${{ matrix.ndpid_examples }} ${{ matrix.ndpid_zlib }} ${{ matrix.ndpid_gcrypt }} ${{ matrix.ndpid_extras }}
+ ${{ matrix.ndpid_examples }} ${{ matrix.ndpid_rust_examples }} ${{ matrix.ndpid_zlib }} ${{ matrix.ndpid_gcrypt }} ${{ matrix.ndpid_extras }}
- name: Build nDPId
run: |
cmake --build build --verbose
@@ -237,7 +250,7 @@ jobs:
-I. -I./dependencies -I./dependencies/jsmn -I./dependencies/uthash/include \
-o /tmp/a.out
- name: Build single nDPId/nDPIsrvd executables (invoke CC directly - static nDPI lib)
- if: endsWith(matrix.compiler, 'gcc-7') == false && startsWith(matrix.ndpi_build, '-DBUILD_NDPI=ON') && startsWith(matrix.coverage, '-DENABLE_COVERAGE=OFF') && startsWith(matrix.sanitizer, '-DENABLE_SANITIZER=ON') && startsWith(matrix.ndpid_gcrypt, '-DNDPI_WITH_GCRYPT=OFF') && startsWith(matrix.ndpid_zlib, '-DENABLE_ZLIB=ON')
+ if: startsWith(matrix.ndpi_build, '-DBUILD_NDPI=ON') && startsWith(matrix.coverage, '-DENABLE_COVERAGE=OFF') && startsWith(matrix.sanitizer, '-DENABLE_SANITIZER=ON') && startsWith(matrix.ndpid_gcrypt, '-DNDPI_WITH_GCRYPT=OFF') && startsWith(matrix.ndpid_zlib, '-DENABLE_ZLIB=ON')
run: |
cc -Wall -Wextra -std=gnu99 ${{ matrix.poll }} -DENABLE_ZLIB=1 -DENABLE_MEMORY_PROFILING=1 \
-fsanitize=address -fsanitize=undefined -fno-sanitize=alignment -fsanitize=enum -fsanitize=leak \
@@ -276,7 +289,7 @@ jobs:
cmake -S . -B ./build \
-DENABLE_DBUS=ON -DENABLE_CURL=ON -DENABLE_SYSTEMD=ON \
${{ matrix.poll }} ${{ matrix.coverage }} ${{ matrix.sanitizer }} ${{ matrix.ndpi_build }} \
- ${{ matrix.ndpid_examples }} ${{ matrix.ndpid_zlib }} ${{ matrix.ndpid_gcrypt }} ${{ matrix.ndpid_extras }}
+ ${{ matrix.ndpid_examples }} ${{ matrix.ndpid_rust_examples }} ${{ matrix.ndpid_zlib }} ${{ matrix.ndpid_gcrypt }} ${{ matrix.ndpid_extras }}
cd ../..
rm -rf "nDPId-dist-${RAND_ID}"
- name: CPack DEB
@@ -308,11 +321,11 @@ jobs:
sudo systemctl stop ndpisrvd.service
journalctl --no-tail --no-pager -u ndpisrvd.service -u ndpid@lo.service
- name: Build PF_RING and nDPId (invoke CC directly - dynamic nDPI lib)
- if: endsWith(matrix.compiler, 'gcc-7') == false && startsWith(matrix.ndpi_build, '-DBUILD_NDPI=ON') && startsWith(matrix.coverage, '-DENABLE_COVERAGE=OFF') && startsWith(matrix.sanitizer, '-DENABLE_SANITIZER=ON') && startsWith(matrix.ndpid_gcrypt, '-DNDPI_WITH_GCRYPT=OFF') && startsWith(matrix.ndpid_zlib, '-DENABLE_ZLIB=ON')
+ if: startsWith(matrix.ndpi_build, '-DBUILD_NDPI=ON') && startsWith(matrix.coverage, '-DENABLE_COVERAGE=OFF') && startsWith(matrix.sanitizer, '-DENABLE_SANITIZER=ON') && startsWith(matrix.ndpid_gcrypt, '-DNDPI_WITH_GCRYPT=OFF') && startsWith(matrix.ndpid_zlib, '-DENABLE_ZLIB=ON')
run: |
git clone --depth=1 https://github.com/ntop/PF_RING.git
- cd PF_RING && make all && sudo make install prefix=/usr
- cd ..
+ cd PF_RING/userland && ./configure && make && sudo make install prefix=/usr
+ cd ../..
cc -Wall -Wextra -std=gnu99 ${{ matrix.poll }} -DENABLE_PFRING=1 -DENABLE_ZLIB=1 -DENABLE_MEMORY_PROFILING=1 \
-fsanitize=address -fsanitize=undefined -fno-sanitize=alignment -fsanitize=enum -fsanitize=leak \
nDPId.c npfring.c nio.c utils.c \
@@ -320,7 +333,7 @@ jobs:
-I./build/libnDPI/include/ndpi \
-I./PF_RING/userland/lib -I./PF_RING/kernel \
-o /tmp/a.out \
- -ldl ./PF_RING/userland/lib/libpfring.a -lpcap ./build/libnDPI/lib/libndpi.a -pthread -lm -lz
+ -ldl /usr/lib/libpfring.a -lpcap ./build/libnDPI/lib/libndpi.a -pthread -lm -lz
- name: Build against libnDPI-${{ matrix.ndpi_min_version }}
if: startsWith(matrix.os, 'ubuntu')
run: |
@@ -351,7 +364,7 @@ jobs:
-DBUILD_NDPI=OFF -DBUILD_EXAMPLES=ON \
-DENABLE_DBUS=ON -DENABLE_CURL=ON -DENABLE_SYSTEMD=ON \
${{ matrix.poll }} ${{ matrix.coverage }} \
- ${{ matrix.sanitizer }} ${{ matrix.ndpid_examples }}; }
+ ${{ matrix.sanitizer }} ${{ matrix.ndpid_examples }} ${{ matrix.ndpid_rust_examples }}; }
test $WGET_RET -ne 0 || { echo "Running Make.. (pkgconfig)"; \
cmake --build ./build-local-pkgconfig --verbose; }
test $WGET_RET -ne 0 || { echo "Testing Executable.. (pkgconfig)"; \
@@ -366,7 +379,7 @@ jobs:
-DENABLE_DBUS=ON -DENABLE_CURL=ON -DENABLE_SYSTEMD=ON \
-DNDPI_NO_PKGCONFIG=ON -DSTATIC_LIBNDPI_INSTALLDIR=/usr \
${{ matrix.poll }} ${{ matrix.coverage }} ${{ matrix.ndpid_gcrypt }} \
- ${{ matrix.sanitizer }} ${{ matrix.ndpid_examples }}; }
+ ${{ matrix.sanitizer }} ${{ matrix.ndpid_examples }} ${{ matrix.ndpid_rust_examples }}; }
test $WGET_RET -ne 0 || { echo "Running Make.. (static)"; \
cmake --build ./build-local-static --verbose; }
test $WGET_RET -ne 0 || { echo "Testing Executable.. (static)"; \
@@ -383,7 +396,7 @@ jobs:
-DPFRING_LINK_STATIC=OFF \
-DPFRING_INSTALLDIR=/usr -DPFRING_KERNEL_INC="$(realpath ./PF_RING/kernel)" \
${{ matrix.poll }} ${{ matrix.coverage }} ${{ matrix.ndpid_gcrypt }} \
- ${{ matrix.sanitizer }} ${{ matrix.ndpid_examples }}; }
+ ${{ matrix.sanitizer }} ${{ matrix.ndpid_examples }} ${{ matrix.ndpid_rust_examples }}; }
test $WGET_RET -ne 0 || test ! -d ./PF_RING || { echo "Running Make.. (PF_RING)"; \
cmake --build ./build-local-pfring --verbose; }
test $WGET_RET -ne 0 || test ! -d ./PF_RING || { echo "Testing Executable.. (PF_RING)"; \
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 1bcae37de..f254c6122 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -94,6 +94,7 @@ option(ENABLE_MEMORY_PROFILING "Enable dynamic memory tracking." OFF)
option(ENABLE_ZLIB "Enable zlib support for nDPId (experimental)." OFF)
option(ENABLE_SYSTEMD "Install systemd components." OFF)
option(BUILD_EXAMPLES "Build C examples." ON)
+option(BUILD_RUST_EXAMPLES "Build Rust examples." OFF)
if(BUILD_EXAMPLES)
option(ENABLE_DBUS "Build DBus notification example." OFF)
option(ENABLE_CURL "Build influxdb data write example." OFF)
@@ -555,6 +556,18 @@ if(BUILD_EXAMPLES)
install(DIRECTORY examples/c-collectd/www DESTINATION share/nDPId/nDPIsrvd-collectd)
endif()
+if(BUILD_RUST_EXAMPLES)
+ add_custom_command(
+ OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/target/release/rs-simple
+ COMMAND cargo build --release
+ WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/examples/rs-simple
+ COMMENT "Build Rust executable with cargo: rs-simple"
+ )
+ add_custom_target(rs-simple ALL
+ DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/target/release/rs-simple
+ )
+endif()
+
if(ENABLE_SYSTEMD)
configure_file(packages/systemd/ndpisrvd.service.in ndpisrvd.service @ONLY)
configure_file(packages/systemd/ndpid@.service.in ndpid@.service @ONLY)
@@ -621,6 +634,7 @@ message(STATUS "ENABLE_MEMORY_PROFILING..: ${ENABLE_MEMORY_PROFILING}")
message(STATUS "ENABLE_ZLIB..............: ${ENABLE_ZLIB}")
message(STATUS "BUILD_NDPI...............: ${BUILD_NDPI}")
message(STATUS "BUILD_EXAMPLES...........: ${BUILD_EXAMPLES}")
+message(STATUS "BUILD_RUST_EXAMPLES......: ${BUILD_RUST_EXAMPLES}")
if(BUILD_EXAMPLES)
message(STATUS "ENABLE_DBUS..............: ${ENABLE_DBUS}")
message(STATUS "ENABLE_CURL..............: ${ENABLE_CURL}")
diff --git a/examples/README.md b/examples/README.md
index 524fa489d..0a75883be 100644
--- a/examples/README.md
+++ b/examples/README.md
@@ -93,7 +93,11 @@ Required by `tests/run_tests.sh`
Validate nDPId JSON messages against internal event semantics.
Required by `tests/run_tests.sh`
+## rs-simple
+
+A straight forward Rust deserialization/parsing example.
+
## yaml-filebeat
An example filebeat configuration to parse and send nDPId JSON
messages to Elasticsearch. Allowing long term storage and data visualization with kibana
-and various other tools that interact with Elasticsearch (No logstash required). \ No newline at end of file
+and various other tools that interact with Elasticsearch (No logstash required).
diff --git a/examples/rs-simple/Cargo.toml b/examples/rs-simple/Cargo.toml
new file mode 100644
index 000000000..b5e0eebb5
--- /dev/null
+++ b/examples/rs-simple/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "rs-simple"
+version = "0.1.0"
+authors = ["Toni Uhlig <toni@impl.cc>"]
+edition = "2024"
+
+[dependencies]
+bytes = "1"
+crossterm = "0.29.0"
+io = "0.0.2"
+moka = { version = "0.12.10", features = ["future"] }
+serde = { version = "1", features = ["derive"] }
+serde_json = "1.0"
+tokio = { version = "1", features = ["full"] }
+tui = "0.19.0"
+
+[profile.release]
+strip = true
+lto = true
+codegen-units = 1
diff --git a/examples/rs-simple/src/main.rs b/examples/rs-simple/src/main.rs
new file mode 100644
index 000000000..647c351c2
--- /dev/null
+++ b/examples/rs-simple/src/main.rs
@@ -0,0 +1,590 @@
+use bytes::BytesMut;
+use crossterm::{
+ cursor,
+ event::{self, KeyCode, KeyEvent},
+ ExecutableCommand,
+ terminal::{self, ClearType},
+};
+use moka::{future::Cache, Expiry};
+use serde::{Deserialize, Serialize};
+use serde_json::Value;
+use std::{
+ fmt,
+ hash::{Hash, Hasher},
+ io::self,
+ sync::Arc,
+ time::{Duration, Instant, SystemTime, UNIX_EPOCH},
+};
+use tokio::io::AsyncReadExt;
+use tokio::sync::mpsc;
+use tokio::sync::Mutex;
+use tokio::sync::MutexGuard;
+use tokio::net::TcpStream;
+use tui::{
+ backend::CrosstermBackend,
+ layout::{Layout, Constraint, Direction},
+ style::{Style, Color, Modifier},
+ Terminal,
+ widgets::{Block, Borders, List, ListItem, Row, Table, TableState},
+};
+
+#[derive(Debug)]
+enum ParseError {
+ Protocol(),
+ Json(),
+ Schema(),
+}
+
+impl From<serde_json::Error> for ParseError {
+ fn from(_: serde_json::Error) -> Self {
+ ParseError::Json()
+ }
+}
+
+#[derive(Serialize, Deserialize, Debug, PartialEq)]
+#[serde(rename_all = "lowercase")]
+enum EventName {
+ Invalid, New, End, Idle, Update, Analyse,
+ Guessed, Detected,
+ #[serde(rename = "detection-update")]
+ DetectionUpdate,
+ #[serde(rename = "not-detected")]
+ NotDetected,
+}
+
+#[derive(Serialize, Deserialize, Copy, Clone, Debug)]
+#[serde(rename_all = "lowercase")]
+enum State {
+ Unknown, Info, Finished,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+struct FlowEvent {
+ #[serde(rename = "flow_event_name")]
+ name: EventName,
+ #[serde(rename = "flow_id")]
+ id: u64,
+ #[serde(rename = "flow_state")]
+ state: State,
+ #[serde(rename = "flow_first_seen")]
+ first_seen: u64,
+ #[serde(rename = "flow_src_last_pkt_time")]
+ src_last_pkt_time: u64,
+ #[serde(rename = "flow_dst_last_pkt_time")]
+ dst_last_pkt_time: u64,
+ #[serde(rename = "flow_idle_time")]
+ idle_time: u64,
+ #[serde(rename = "flow_src_packets_processed")]
+ src_packets_processed: u64,
+ #[serde(rename = "flow_dst_packets_processed")]
+ dst_packets_processed: u64,
+ #[serde(rename = "flow_src_tot_l4_payload_len")]
+ src_tot_l4_payload_len: u64,
+ #[serde(rename = "flow_dst_tot_l4_payload_len")]
+ dst_tot_l4_payload_len: u64,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+struct PacketEvent {
+ pkt_datalink: u16,
+ pkt_caplen: u64,
+ pkt_len: u64,
+ pkt_l4_len: u64,
+}
+
+#[derive(Debug)]
+enum EventType {
+ Flow(FlowEvent),
+ Packet(PacketEvent),
+ Other(),
+}
+
+#[derive(Default)]
+struct Stats {
+ ui_updates: u64,
+ flow_count: u64,
+ parse_errors: u64,
+ events: u64,
+ flow_events: u64,
+ packet_events: u64,
+ total_caplen: u64,
+ total_len: u64,
+ total_l4_len: u64,
+}
+
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+enum Expiration {
+ FlowIdleTime(u64),
+}
+
+struct FlowExpiry;
+
+#[derive(Clone, Eq, Default, Debug)]
+struct FlowKey {
+ id: u64,
+}
+
+#[derive(Clone, Debug)]
+struct FlowValue {
+ state: State,
+ total_src_packets: u64,
+ total_dst_packets: u64,
+ total_src_bytes: u64,
+ total_dst_bytes: u64,
+ first_seen: std::time::SystemTime,
+ last_seen: std::time::SystemTime,
+ timeout_in: std::time::SystemTime,
+}
+
+impl Default for State {
+ fn default() -> State {
+ State::Unknown
+ }
+}
+
+impl fmt::Display for State {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ State::Unknown => write!(f, "N/A"),
+ State::Info => write!(f, "Info"),
+ State::Finished => write!(f, "Finished"),
+ }
+ }
+}
+
+impl Expiration {
+ fn as_duration(&self) -> Option<Duration> {
+ match self {
+ Expiration::FlowIdleTime(value) => Some(Duration::from_micros(*value)),
+ }
+ }
+}
+
+impl fmt::Display for Expiration {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self.as_duration() {
+ Some(duration) => {
+ let secs = duration.as_secs();
+ write!(f, "{} s", secs)
+ }
+ None => write!(f, "N/A"),
+ }
+ }
+}
+
+impl Expiry<FlowKey, (Expiration, FlowValue)> for FlowExpiry {
+ fn expire_after_create(
+ &self,
+ _key: &FlowKey,
+ value: &(Expiration, FlowValue),
+ _current_time: Instant,
+ ) -> Option<Duration> {
+ value.0.as_duration()
+ }
+}
+
+impl Hash for FlowKey {
+ fn hash<H: Hasher>(&self, state: &mut H) {
+ self.id.hash(state)
+ }
+}
+
+impl PartialEq for FlowKey {
+ fn eq(&self, other: &Self) -> bool {
+ self.id == other.id
+ }
+}
+
+#[tokio::main]
+async fn main() {
+ let server_address = "127.0.0.1:7000";
+
+ let mut stream = match TcpStream::connect(server_address).await {
+ Ok(stream) => stream,
+ Err(e) => {
+ eprintln!("Connection to {} failed: {}", server_address, e);
+ return;
+ }
+ };
+ if let Err(e) = terminal::enable_raw_mode() {
+ eprintln!("Could not enable terminal raw mode: {}", e);
+ return;
+ }
+ let mut stdout = io::stdout();
+ if let Err(e) = stdout.execute(terminal::Clear(ClearType::All)) {
+ eprintln!("Could not clear your terminal: {}", e);
+ return;
+ }
+ if let Err(e) = stdout.execute(cursor::Hide) {
+ eprintln!("Could not hide your cursor: {}", e);
+ return;
+ }
+ let backend = CrosstermBackend::new(stdout);
+ let mut terminal = Terminal::new(backend);
+
+ let mut buffer = BytesMut::with_capacity(33792usize);
+ let (tx, mut rx): (mpsc::Sender<String>, mpsc::Receiver<String>) = mpsc::channel(1024);
+ let data = Arc::new(Mutex::new(Stats::default()));
+ let data_tx = Arc::clone(&data);
+ let data_rx = Arc::clone(&data);
+ let flow_cache: Arc<Cache<FlowKey, (Expiration, FlowValue)>> = Arc::new(Cache::builder()
+ .expire_after(FlowExpiry)
+ .build());
+ let flow_cache_rx = Arc::clone(&flow_cache);
+
+ tokio::spawn(async move {
+ while let Some(msg) = rx.recv().await {
+ match parse_json(&msg) {
+ Ok(message) => {
+ let mut data_lock = data_tx.lock().await;
+ data_lock.events += 1;
+ update_stats(&message, &mut data_lock, &flow_cache).await;
+ }
+ Err(_message) => {
+ let mut data_lock = data_tx.lock().await;
+ data_lock.parse_errors += 1;
+ }
+ }
+ }
+ });
+ tokio::spawn(async move {
+ loop {
+ let n = match stream.read_buf(&mut buffer).await {
+ Ok(len) => len,
+ Err(_) => {
+ continue; // Versuche es erneut, wenn ein Fehler auftritt
+ }
+ };
+ if n == 0 {
+ break;
+ }
+
+ while let Some(message) = parse_message(&mut buffer) {
+ match tx.send(message).await {
+ Ok(_) => (),
+ Err(_) => return
+ }
+ }
+ }
+ });
+
+ let mut table_state = TableState::default();
+
+ loop {
+ let flows: Vec<(FlowKey, (Expiration, FlowValue))> = flow_cache_rx.iter().map(|(k, v)| (k.as_ref().clone(), v.clone())).collect();
+ let mut table_selected = match table_state.selected() {
+ Some(table_index) => {
+ if flows.len() > 0 && table_index >= flows.len() {
+ flows.len() - 1
+ } else {
+ table_index
+ }
+ }
+ None => 0,
+ };
+
+ match read_keypress() {
+ Some(KeyCode::Esc) => break,
+ Some(KeyCode::Char('q')) => break,
+ Some(KeyCode::Up) => {
+ table_selected = match table_selected {
+ _ if flows.len() == 0 => 0,
+ i if i == 0 => flows.len() - 1,
+ i => i - 1,
+ };
+ },
+ Some(KeyCode::Down) => {
+ table_selected = match table_selected {
+ i if flows.len() == 0 || i >= flows.len() - 1 => 0,
+ i => i + 1,
+ };
+ },
+ Some(KeyCode::PageUp) => {
+ table_selected = match table_selected {
+ _ if flows.len() == 0 => 0,
+ i if i == 0 => flows.len() - 1,
+ i if i < 10 => 0,
+ i => i - 10,
+ };
+ },
+ Some(KeyCode::PageDown) => {
+ table_selected = match table_selected {
+ i if flows.len() == 0 || i >= flows.len() - 1 => 0,
+ i if flows.len() < 10 || i >= flows.len() - 10 => flows.len() - 1,
+ i => i + 10,
+ };
+ },
+ Some(KeyCode::Home) => {
+ table_selected = 0;
+ },
+ Some(KeyCode::End) => {
+ table_selected = match table_selected {
+ _ if flows.len() == 0 => 0,
+ _ => flows.len() - 1,
+ };
+ },
+ Some(_) => (),
+ None => ()
+ };
+
+ let mut data_lock = data_rx.lock().await;
+ data_lock.ui_updates += 1;
+ draw_ui(terminal.as_mut().unwrap(), &mut table_state, table_selected, &data_lock, &flows);
+ }
+
+ if let Err(e) = terminal.unwrap().backend_mut().execute(cursor::Show) {
+ eprintln!("Could not show your cursor: {}", e);
+ return;
+ }
+ let mut stdout = io::stdout();
+ if let Err(e) = stdout.execute(terminal::Clear(ClearType::All)) {
+ eprintln!("Could not clear your terminal: {}", e);
+ return;
+ }
+ if let Err(e) = terminal::disable_raw_mode() {
+ eprintln!("Could not disable raw mode: {}", e);
+ return;
+ }
+ println!("\nDone.");
+}
+
+fn read_keypress() -> Option<KeyCode> {
+ if event::poll(Duration::from_millis(500)).unwrap() {
+ if let event::Event::Key(KeyEvent { code, .. }) = event::read().unwrap() {
+ return Some(code);
+ }
+ }
+
+ None
+}
+
+fn parse_message(buffer: &mut BytesMut) -> Option<String> {
+ if let Some(pos) = buffer.iter().position(|&b| b == b'\n') {
+ let message = buffer.split_to(pos + 1);
+ return Some(String::from_utf8_lossy(&message).to_string());
+ }
+
+ None
+}
+
+fn parse_json(data: &str) -> Result<EventType, ParseError> {
+ let first_non_digit = data.find(|c: char| !c.is_ascii_digit()).unwrap_or(0);
+ let length_str = &data[0..first_non_digit];
+ let length: usize = length_str.parse().unwrap_or(0);
+ if length == 0 {
+ return Err(ParseError::Protocol());
+ }
+
+ let json_str = &data[first_non_digit..first_non_digit + length];
+ let value: Value = serde_json::from_str(json_str).map_err(|_| ParseError::Json()).unwrap();
+ if value.get("flow_event_name").is_some() {
+ let flow_event: FlowEvent = serde_json::from_value(value)?;
+ return Ok(EventType::Flow(flow_event));
+ } else if value.get("packet_event_name").is_some() {
+ let packet_event: PacketEvent = serde_json::from_value(value)?;
+ return Ok(EventType::Packet(packet_event));
+ } else if value.get("daemon_event_name").is_some() ||
+ value.get("error_event_name").is_some() {
+ return Ok(EventType::Other());
+ }
+
+ Err(ParseError::Schema())
+}
+
+async fn update_stats(event: &EventType, stats: &mut MutexGuard<'_, Stats>, cache: &Cache<FlowKey, (Expiration, FlowValue)>) {
+ match &event {
+ EventType::Flow(flow_event) => {
+ stats.flow_events += 1;
+ stats.flow_count = cache.entry_count();
+ let key = FlowKey { id: flow_event.id };
+
+ if flow_event.name == EventName::End ||
+ flow_event.name == EventName::Idle
+ {
+ cache.remove(&key).await;
+ return;
+ }
+
+ let first_seen_seconds = flow_event.first_seen / 1_000_000;
+ let first_seen_nanos = (flow_event.first_seen % 1_000_000) * 1_000;
+ let first_seen_epoch = std::time::Duration::new(first_seen_seconds, first_seen_nanos as u32);
+ let first_seen_system = UNIX_EPOCH + first_seen_epoch;
+
+ let last_seen = std::cmp::max(flow_event.src_last_pkt_time,
+ flow_event.dst_last_pkt_time);
+ let last_seen_seconds = last_seen / 1_000_000;
+ let last_seen_nanos = (last_seen % 1_000_000) * 1_000;
+ let last_seen_epoch = std::time::Duration::new(last_seen_seconds, last_seen_nanos as u32);
+ let last_seen_system = UNIX_EPOCH + last_seen_epoch;
+
+ let timeout_seconds = (last_seen + flow_event.idle_time) / 1_000_000;
+ let timeout_nanos = ((last_seen + flow_event.idle_time) % 1_000_000) * 1_000;
+ let timeout_epoch = std::time::Duration::new(timeout_seconds, timeout_nanos as u32);
+ let timeout_system = UNIX_EPOCH + timeout_epoch;
+
+ let value = FlowValue {
+ state: flow_event.state,
+ total_src_packets: flow_event.src_packets_processed,
+ total_dst_packets: flow_event.dst_packets_processed,
+ total_src_bytes: flow_event.src_tot_l4_payload_len,
+ total_dst_bytes: flow_event.dst_tot_l4_payload_len,
+ first_seen: first_seen_system,
+ last_seen: last_seen_system,
+ timeout_in: timeout_system,
+ };
+ cache.insert(key, (Expiration::FlowIdleTime(flow_event.idle_time), value)).await;
+ }
+ EventType::Packet(packet_event) => {
+ stats.packet_events += 1;
+ stats.total_caplen += packet_event.pkt_caplen;
+ stats.total_len += packet_event.pkt_len;
+ stats.total_l4_len += packet_event.pkt_l4_len;
+ }
+ EventType::Other() => {}
+ }
+}
+
+fn format_bytes(bytes: u64) -> String {
+ const KB: u64 = 1024;
+ const MB: u64 = KB * 1024;
+ const GB: u64 = MB * 1024;
+
+ if bytes >= GB {
+ format!("{} GB", bytes / GB)
+ } else if bytes >= MB {
+ format!("{} MB", bytes / MB)
+ } else if bytes >= KB {
+ format!("{} kB", bytes / KB)
+ } else {
+ format!("{} B", bytes)
+ }
+}
+
+fn draw_ui<B: tui::backend::Backend>(terminal: &mut Terminal<B>, table_state: &mut TableState, table_selected: usize, data: &MutexGuard<Stats>, flows: &Vec<(FlowKey, (Expiration, FlowValue))>) {
+ let general_items = vec![
+ ListItem::new("TUI Updates..: ".to_owned() + &data.ui_updates.to_string()),
+ ListItem::new("Flows Cached.: ".to_owned() + &data.flow_count.to_string()),
+ ListItem::new("Total Events.: ".to_owned() + &data.events.to_string()),
+ ListItem::new("Parse Errors.: ".to_owned() + &data.parse_errors.to_string()),
+ ListItem::new("Flow Events..: ".to_owned() + &data.flow_events.to_string()),
+ ListItem::new("Packet Events: ".to_owned() + &data.packet_events.to_string()),
+ ];
+ let packet_items = vec![
+ ListItem::new("Total Capture Length: ".to_owned() + &format_bytes(data.total_caplen)),
+ ListItem::new("Total Length........: ".to_owned() + &format_bytes(data.total_len)),
+ ListItem::new("Total L4 Length.....: ".to_owned() + &format_bytes(data.total_l4_len)),
+ ];
+ let table_rows: Vec<Row> = flows
+ .into_iter()
+ .map(|(key, (_exp, val))| {
+ let first_seen_display = match val.first_seen.elapsed() {
+ Ok(elapsed) => {
+ match elapsed.as_secs() {
+ t if t > (3_600 * 24) => format!("{} d ago", t / (3_600 * 24)),
+ t if t > 3_600 => format!("{} h ago", t / 3_600),
+ t if t > 60 => format!("{} min ago", t / 60),
+ t if t > 0 => format!("{} s ago", t),
+ t if t == 0 => "< 1 s ago".to_string(),
+ t => format!("INVALID: {}", t),
+ }
+ }
+ Err(err) => format!("ERROR: {}", err)
+ };
+
+ let last_seen_display = match val.last_seen.elapsed() {
+ Ok(elapsed) => {
+ match elapsed.as_secs() {
+ t if t > (3_600 * 24) => format!("{} d ago", t / (3_600 * 24)),
+ t if t > 3_600 => format!("{} h ago", t / 3_600),
+ t if t > 60 => format!("{} min ago", t / 60),
+ t if t > 0 => format!("{} s ago", t),
+ t if t == 0 => "< 1 s ago".to_string(),
+ t => format!("INVALID: {}", t),
+ }
+ }
+ Err(_err) => "ERROR".to_string()
+ };
+
+ let timeout_display = match val.timeout_in.duration_since(SystemTime::now()) {
+ Ok(elapsed) => {
+ match elapsed.as_secs() {
+ t if t > (3_600 * 24) => format!("in {} d", t / (3_600 * 24)),
+ t if t > 3_600 => format!("in {} h", t / 3_600),
+ t if t > 60 => format!("in {} min", t / 60),
+ t if t > 0 => format!("in {} s", t),
+ t if t == 0 => "in < 1 s".to_string(),
+ t => format!("INVALID: {}", t),
+ }
+ }
+ Err(_err) => "EXPIRED".to_string()
+ };
+
+ Row::new(vec![
+ key.id.to_string(),
+ val.state.to_string(),
+ first_seen_display,
+ last_seen_display,
+ timeout_display,
+ (val.total_src_packets + val.total_dst_packets).to_string(),
+ format_bytes(val.total_src_bytes + val.total_dst_bytes),
+ ])
+ })
+ .collect();
+
+ terminal.draw(|f| {
+ let size = f.size();
+
+ let chunks = Layout::default()
+ .direction(Direction::Vertical)
+ .constraints(
+ [
+ Constraint::Percentage(18),
+ Constraint::Percentage(82),
+ ].as_ref()
+ )
+ .split(size);
+
+ let top_chunks = Layout::default()
+ .direction(Direction::Horizontal)
+ .constraints(
+ [
+ Constraint::Percentage(50),
+ Constraint::Percentage(50),
+ ].as_ref()
+ )
+ .split(chunks[0]);
+
+ let table_selected_abs = match table_selected {
+ _ if flows.len() == 0 => 0,
+ i => i + 1,
+ };
+ let table = Table::new(table_rows)
+ .header(Row::new(vec!["Flow ID", "State", "First Seen", "Last Seen", "Timeout", "Total Packets", "Total Bytes"])
+ .style(Style::default().fg(Color::Yellow).add_modifier(Modifier::BOLD)))
+ .block(Block::default().title("Flow Table (selected: ".to_string() +
+ &table_selected_abs.to_string() +
+ "): " +
+ &flows.len().to_string() +
+ " item(s)").borders(Borders::ALL))
+ .highlight_style(Style::default().bg(Color::Blue))
+ .widths(&[
+ Constraint::Length(20),
+ Constraint::Length(20),
+ Constraint::Length(20),
+ Constraint::Length(20),
+ Constraint::Length(20),
+ Constraint::Length(20),
+ Constraint::Length(20),
+ ]);
+
+ let general_list = List::new(general_items)
+ .block(Block::default().title("General").borders(Borders::ALL));
+ let packet_list = List::new(packet_items)
+ .block(Block::default().title("Packet Events").borders(Borders::ALL));
+
+ table_state.select(Some(table_selected));
+ f.render_widget(general_list, top_chunks[0]);
+ f.render_widget(packet_list, top_chunks[1]);
+ f.render_stateful_widget(table, chunks[1], table_state);
+ }).unwrap();
+}