diff options
-rw-r--r-- | .github/workflows/build-freebsd.yml | 2 | ||||
-rw-r--r-- | .github/workflows/build.yml | 45 | ||||
-rw-r--r-- | CMakeLists.txt | 14 | ||||
-rw-r--r-- | examples/README.md | 6 | ||||
-rw-r--r-- | examples/rs-simple/Cargo.toml | 20 | ||||
-rw-r--r-- | examples/rs-simple/src/main.rs | 590 |
6 files changed, 659 insertions, 18 deletions
diff --git a/.github/workflows/build-freebsd.yml b/.github/workflows/build-freebsd.yml index 1dc5ac278..590d4156d 100644 --- a/.github/workflows/build-freebsd.yml +++ b/.github/workflows/build-freebsd.yml @@ -23,7 +23,7 @@ jobs: - uses: actions/checkout@v4 - name: Test in FreeBSD id: test - uses: vmactions/freebsd-vm@v1 + uses: vmactions/freebsd-vm@main with: usesh: true prepare: | diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 3bf6b5c49..42c18d405 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -33,6 +33,7 @@ jobs: os: "ubuntu-latest" ndpi_build: "-DBUILD_NDPI=ON" ndpid_examples: "-DBUILD_EXAMPLES=ON" + ndpid_rust_examples: "-DBUILD_RUST_EXAMPLES=ON" ndpid_gcrypt: "-DNDPI_WITH_GCRYPT=OFF" ndpid_zlib: "-DENABLE_ZLIB=ON" ndpid_extras: "" @@ -46,6 +47,7 @@ jobs: os: "ubuntu-latest" ndpi_build: "-DBUILD_NDPI=ON" ndpid_examples: "-DBUILD_EXAMPLES=ON" + ndpid_rust_examples: "" ndpid_gcrypt: "-DNDPI_WITH_GCRYPT=ON" ndpid_zlib: "-DENABLE_ZLIB=ON" ndpid_extras: "-DNDPI_WITH_MAXMINDDB=ON -DNDPI_WITH_PCRE=ON -DENABLE_MEMORY_PROFILING=ON" @@ -59,6 +61,7 @@ jobs: os: "ubuntu-latest" ndpi_build: "-DBUILD_NDPI=ON" ndpid_examples: "-DBUILD_EXAMPLES=ON" + ndpid_rust_examples: "" ndpid_gcrypt: "-DNDPI_WITH_GCRYPT=OFF" ndpid_zlib: "-DENABLE_ZLIB=OFF" ndpid_extras: "" @@ -72,6 +75,7 @@ jobs: os: "ubuntu-latest" ndpi_build: "-DBUILD_NDPI=ON" ndpid_examples: "-DBUILD_EXAMPLES=ON" + ndpid_rust_examples: "" ndpid_gcrypt: "-DNDPI_WITH_GCRYPT=OFF" ndpid_zlib: "-DENABLE_ZLIB=ON" ndpid_extras: "" @@ -84,6 +88,7 @@ jobs: os: "ubuntu-latest" ndpi_build: "-DBUILD_NDPI=ON" ndpid_examples: "-DBUILD_EXAMPLES=ON" + ndpid_rust_examples: "" ndpid_gcrypt: "-DNDPI_WITH_GCRYPT=OFF" ndpid_zlib: "-DENABLE_ZLIB=ON" ndpid_extras: "" @@ -93,9 +98,10 @@ jobs: upload: false ndpi_min_version: "4.14" - compiler: "clang-12" - os: "ubuntu-20.04" + os: "ubuntu-22.04" ndpi_build: "-DBUILD_NDPI=ON" ndpid_examples: "-DBUILD_EXAMPLES=ON" + ndpid_rust_examples: "" ndpid_gcrypt: "-DNDPI_WITH_GCRYPT=OFF" ndpid_zlib: "-DENABLE_ZLIB=ON" ndpid_extras: "" @@ -105,9 +111,10 @@ jobs: upload: false ndpi_min_version: "4.14" - compiler: "gcc-10" - os: "ubuntu-20.04" + os: "ubuntu-22.04" ndpi_build: "-DBUILD_NDPI=ON" ndpid_examples: "-DBUILD_EXAMPLES=ON" + ndpid_rust_examples: "" ndpid_gcrypt: "-DNDPI_WITH_GCRYPT=OFF" ndpid_zlib: "-DENABLE_ZLIB=OFF" ndpid_extras: "" @@ -116,10 +123,11 @@ jobs: poll: "-DFORCE_POLL=ON" upload: false ndpi_min_version: "4.14" - - compiler: "gcc-7" - os: "ubuntu-20.04" + - compiler: "gcc-9" + os: "ubuntu-22.04" ndpi_build: "-DBUILD_NDPI=ON" ndpid_examples: "-DBUILD_EXAMPLES=ON" + ndpid_rust_examples: "" ndpid_gcrypt: "-DNDPI_WITH_GCRYPT=OFF" ndpid_zlib: "-DENABLE_ZLIB=ON" ndpid_extras: "" @@ -132,6 +140,7 @@ jobs: os: "macOS-13" ndpi_build: "-DBUILD_NDPI=OFF" ndpid_examples: "-DBUILD_EXAMPLES=OFF" + ndpid_rust_examples: "" ndpid_gcrypt: "-DNDPI_WITH_GCRYPT=OFF" ndpid_zlib: "-DENABLE_ZLIB=ON" ndpid_extras: "" @@ -169,7 +178,7 @@ jobs: - name: Install MacOS Prerequisites if: startsWith(matrix.os, 'macOS') run: | - brew install coreutils flock automake make unzip cmake pkg-config git wget + brew install coreutils automake make unzip wget 'https://www.tcpdump.org/release/libpcap-1.10.4.tar.gz' tar -xzvf libpcap-1.10.4.tar.gz cd libpcap-1.10.4 @@ -191,6 +200,10 @@ jobs: sudo apt-get update sudo apt-get install autoconf automake cmake libtool pkg-config gettext libjson-c-dev flex bison libpcap-dev zlib1g-dev libcurl4-openssl-dev libdbus-1-dev sudo apt-get install ${{ matrix.compiler }} lcov iproute2 + - name: Install Ubuntu Prerequisites (Rust/Cargo) + if: startsWith(matrix.os, 'ubuntu') && startsWith(matrix.ndpid_rust_examples, '-DBUILD_RUST_EXAMPLES=ON') + run: | + sudo apt-get install cargo - name: Install Ubuntu Prerequisites (libgcrypt) if: startsWith(matrix.os, 'ubuntu') && startsWith(matrix.ndpid_gcrypt, '-DNDPI_WITH_GCRYPT=ON') run: | @@ -204,7 +217,7 @@ jobs: run: | sudo apt-get install libmaxminddb-dev libpcre2-dev - name: Install Ubuntu Prerequisites (libnl-genl-3-dev) - if: endsWith(matrix.compiler, 'gcc-7') == false && startsWith(matrix.ndpi_build, '-DBUILD_NDPI=ON') && startsWith(matrix.coverage, '-DENABLE_COVERAGE=OFF') && startsWith(matrix.sanitizer, '-DENABLE_SANITIZER=ON') && startsWith(matrix.ndpid_gcrypt, '-DNDPI_WITH_GCRYPT=OFF') && startsWith(matrix.ndpid_zlib, '-DENABLE_ZLIB=ON') + if: startsWith(matrix.ndpi_build, '-DBUILD_NDPI=ON') && startsWith(matrix.coverage, '-DENABLE_COVERAGE=OFF') && startsWith(matrix.sanitizer, '-DENABLE_SANITIZER=ON') && startsWith(matrix.ndpid_gcrypt, '-DNDPI_WITH_GCRYPT=OFF') && startsWith(matrix.ndpid_zlib, '-DENABLE_ZLIB=ON') run: | sudo apt-get install libnl-genl-3-dev - name: Checking Network Buffer Size @@ -217,7 +230,7 @@ jobs: cmake -S . -B build -DCMAKE_C_COMPILER="$CMAKE_C_COMPILER" -DCMAKE_C_FLAGS="$CMAKE_C_FLAGS" -DCMAKE_MODULE_LINKER_FLAGS="$CMAKE_MODULE_LINKER_FLAGS" -DCMAKE_C_EXE_LINKER_FLAGS="$CMAKE_C_EXE_LINKER_FLAGS" \ -DENABLE_DBUS=ON -DENABLE_CURL=ON -DENABLE_SYSTEMD=ON \ ${{ matrix.poll }} ${{ matrix.coverage }} ${{ matrix.sanitizer }} ${{ matrix.ndpi_build }} \ - ${{ matrix.ndpid_examples }} ${{ matrix.ndpid_zlib }} ${{ matrix.ndpid_gcrypt }} ${{ matrix.ndpid_extras }} + ${{ matrix.ndpid_examples }} ${{ matrix.ndpid_rust_examples }} ${{ matrix.ndpid_zlib }} ${{ matrix.ndpid_gcrypt }} ${{ matrix.ndpid_extras }} - name: Build nDPId run: | cmake --build build --verbose @@ -237,7 +250,7 @@ jobs: -I. -I./dependencies -I./dependencies/jsmn -I./dependencies/uthash/include \ -o /tmp/a.out - name: Build single nDPId/nDPIsrvd executables (invoke CC directly - static nDPI lib) - if: endsWith(matrix.compiler, 'gcc-7') == false && startsWith(matrix.ndpi_build, '-DBUILD_NDPI=ON') && startsWith(matrix.coverage, '-DENABLE_COVERAGE=OFF') && startsWith(matrix.sanitizer, '-DENABLE_SANITIZER=ON') && startsWith(matrix.ndpid_gcrypt, '-DNDPI_WITH_GCRYPT=OFF') && startsWith(matrix.ndpid_zlib, '-DENABLE_ZLIB=ON') + if: startsWith(matrix.ndpi_build, '-DBUILD_NDPI=ON') && startsWith(matrix.coverage, '-DENABLE_COVERAGE=OFF') && startsWith(matrix.sanitizer, '-DENABLE_SANITIZER=ON') && startsWith(matrix.ndpid_gcrypt, '-DNDPI_WITH_GCRYPT=OFF') && startsWith(matrix.ndpid_zlib, '-DENABLE_ZLIB=ON') run: | cc -Wall -Wextra -std=gnu99 ${{ matrix.poll }} -DENABLE_ZLIB=1 -DENABLE_MEMORY_PROFILING=1 \ -fsanitize=address -fsanitize=undefined -fno-sanitize=alignment -fsanitize=enum -fsanitize=leak \ @@ -276,7 +289,7 @@ jobs: cmake -S . -B ./build \ -DENABLE_DBUS=ON -DENABLE_CURL=ON -DENABLE_SYSTEMD=ON \ ${{ matrix.poll }} ${{ matrix.coverage }} ${{ matrix.sanitizer }} ${{ matrix.ndpi_build }} \ - ${{ matrix.ndpid_examples }} ${{ matrix.ndpid_zlib }} ${{ matrix.ndpid_gcrypt }} ${{ matrix.ndpid_extras }} + ${{ matrix.ndpid_examples }} ${{ matrix.ndpid_rust_examples }} ${{ matrix.ndpid_zlib }} ${{ matrix.ndpid_gcrypt }} ${{ matrix.ndpid_extras }} cd ../.. rm -rf "nDPId-dist-${RAND_ID}" - name: CPack DEB @@ -308,11 +321,11 @@ jobs: sudo systemctl stop ndpisrvd.service journalctl --no-tail --no-pager -u ndpisrvd.service -u ndpid@lo.service - name: Build PF_RING and nDPId (invoke CC directly - dynamic nDPI lib) - if: endsWith(matrix.compiler, 'gcc-7') == false && startsWith(matrix.ndpi_build, '-DBUILD_NDPI=ON') && startsWith(matrix.coverage, '-DENABLE_COVERAGE=OFF') && startsWith(matrix.sanitizer, '-DENABLE_SANITIZER=ON') && startsWith(matrix.ndpid_gcrypt, '-DNDPI_WITH_GCRYPT=OFF') && startsWith(matrix.ndpid_zlib, '-DENABLE_ZLIB=ON') + if: startsWith(matrix.ndpi_build, '-DBUILD_NDPI=ON') && startsWith(matrix.coverage, '-DENABLE_COVERAGE=OFF') && startsWith(matrix.sanitizer, '-DENABLE_SANITIZER=ON') && startsWith(matrix.ndpid_gcrypt, '-DNDPI_WITH_GCRYPT=OFF') && startsWith(matrix.ndpid_zlib, '-DENABLE_ZLIB=ON') run: | git clone --depth=1 https://github.com/ntop/PF_RING.git - cd PF_RING && make all && sudo make install prefix=/usr - cd .. + cd PF_RING/userland && ./configure && make && sudo make install prefix=/usr + cd ../.. cc -Wall -Wextra -std=gnu99 ${{ matrix.poll }} -DENABLE_PFRING=1 -DENABLE_ZLIB=1 -DENABLE_MEMORY_PROFILING=1 \ -fsanitize=address -fsanitize=undefined -fno-sanitize=alignment -fsanitize=enum -fsanitize=leak \ nDPId.c npfring.c nio.c utils.c \ @@ -320,7 +333,7 @@ jobs: -I./build/libnDPI/include/ndpi \ -I./PF_RING/userland/lib -I./PF_RING/kernel \ -o /tmp/a.out \ - -ldl ./PF_RING/userland/lib/libpfring.a -lpcap ./build/libnDPI/lib/libndpi.a -pthread -lm -lz + -ldl /usr/lib/libpfring.a -lpcap ./build/libnDPI/lib/libndpi.a -pthread -lm -lz - name: Build against libnDPI-${{ matrix.ndpi_min_version }} if: startsWith(matrix.os, 'ubuntu') run: | @@ -351,7 +364,7 @@ jobs: -DBUILD_NDPI=OFF -DBUILD_EXAMPLES=ON \ -DENABLE_DBUS=ON -DENABLE_CURL=ON -DENABLE_SYSTEMD=ON \ ${{ matrix.poll }} ${{ matrix.coverage }} \ - ${{ matrix.sanitizer }} ${{ matrix.ndpid_examples }}; } + ${{ matrix.sanitizer }} ${{ matrix.ndpid_examples }} ${{ matrix.ndpid_rust_examples }}; } test $WGET_RET -ne 0 || { echo "Running Make.. (pkgconfig)"; \ cmake --build ./build-local-pkgconfig --verbose; } test $WGET_RET -ne 0 || { echo "Testing Executable.. (pkgconfig)"; \ @@ -366,7 +379,7 @@ jobs: -DENABLE_DBUS=ON -DENABLE_CURL=ON -DENABLE_SYSTEMD=ON \ -DNDPI_NO_PKGCONFIG=ON -DSTATIC_LIBNDPI_INSTALLDIR=/usr \ ${{ matrix.poll }} ${{ matrix.coverage }} ${{ matrix.ndpid_gcrypt }} \ - ${{ matrix.sanitizer }} ${{ matrix.ndpid_examples }}; } + ${{ matrix.sanitizer }} ${{ matrix.ndpid_examples }} ${{ matrix.ndpid_rust_examples }}; } test $WGET_RET -ne 0 || { echo "Running Make.. (static)"; \ cmake --build ./build-local-static --verbose; } test $WGET_RET -ne 0 || { echo "Testing Executable.. (static)"; \ @@ -383,7 +396,7 @@ jobs: -DPFRING_LINK_STATIC=OFF \ -DPFRING_INSTALLDIR=/usr -DPFRING_KERNEL_INC="$(realpath ./PF_RING/kernel)" \ ${{ matrix.poll }} ${{ matrix.coverage }} ${{ matrix.ndpid_gcrypt }} \ - ${{ matrix.sanitizer }} ${{ matrix.ndpid_examples }}; } + ${{ matrix.sanitizer }} ${{ matrix.ndpid_examples }} ${{ matrix.ndpid_rust_examples }}; } test $WGET_RET -ne 0 || test ! -d ./PF_RING || { echo "Running Make.. (PF_RING)"; \ cmake --build ./build-local-pfring --verbose; } test $WGET_RET -ne 0 || test ! -d ./PF_RING || { echo "Testing Executable.. (PF_RING)"; \ diff --git a/CMakeLists.txt b/CMakeLists.txt index 1bcae37de..f254c6122 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -94,6 +94,7 @@ option(ENABLE_MEMORY_PROFILING "Enable dynamic memory tracking." OFF) option(ENABLE_ZLIB "Enable zlib support for nDPId (experimental)." OFF) option(ENABLE_SYSTEMD "Install systemd components." OFF) option(BUILD_EXAMPLES "Build C examples." ON) +option(BUILD_RUST_EXAMPLES "Build Rust examples." OFF) if(BUILD_EXAMPLES) option(ENABLE_DBUS "Build DBus notification example." OFF) option(ENABLE_CURL "Build influxdb data write example." OFF) @@ -555,6 +556,18 @@ if(BUILD_EXAMPLES) install(DIRECTORY examples/c-collectd/www DESTINATION share/nDPId/nDPIsrvd-collectd) endif() +if(BUILD_RUST_EXAMPLES) + add_custom_command( + OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/target/release/rs-simple + COMMAND cargo build --release + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/examples/rs-simple + COMMENT "Build Rust executable with cargo: rs-simple" + ) + add_custom_target(rs-simple ALL + DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/target/release/rs-simple + ) +endif() + if(ENABLE_SYSTEMD) configure_file(packages/systemd/ndpisrvd.service.in ndpisrvd.service @ONLY) configure_file(packages/systemd/ndpid@.service.in ndpid@.service @ONLY) @@ -621,6 +634,7 @@ message(STATUS "ENABLE_MEMORY_PROFILING..: ${ENABLE_MEMORY_PROFILING}") message(STATUS "ENABLE_ZLIB..............: ${ENABLE_ZLIB}") message(STATUS "BUILD_NDPI...............: ${BUILD_NDPI}") message(STATUS "BUILD_EXAMPLES...........: ${BUILD_EXAMPLES}") +message(STATUS "BUILD_RUST_EXAMPLES......: ${BUILD_RUST_EXAMPLES}") if(BUILD_EXAMPLES) message(STATUS "ENABLE_DBUS..............: ${ENABLE_DBUS}") message(STATUS "ENABLE_CURL..............: ${ENABLE_CURL}") diff --git a/examples/README.md b/examples/README.md index 524fa489d..0a75883be 100644 --- a/examples/README.md +++ b/examples/README.md @@ -93,7 +93,11 @@ Required by `tests/run_tests.sh` Validate nDPId JSON messages against internal event semantics. Required by `tests/run_tests.sh` +## rs-simple + +A straight forward Rust deserialization/parsing example. + ## yaml-filebeat An example filebeat configuration to parse and send nDPId JSON messages to Elasticsearch. Allowing long term storage and data visualization with kibana -and various other tools that interact with Elasticsearch (No logstash required).
\ No newline at end of file +and various other tools that interact with Elasticsearch (No logstash required). diff --git a/examples/rs-simple/Cargo.toml b/examples/rs-simple/Cargo.toml new file mode 100644 index 000000000..b5e0eebb5 --- /dev/null +++ b/examples/rs-simple/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "rs-simple" +version = "0.1.0" +authors = ["Toni Uhlig <toni@impl.cc>"] +edition = "2024" + +[dependencies] +bytes = "1" +crossterm = "0.29.0" +io = "0.0.2" +moka = { version = "0.12.10", features = ["future"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "1", features = ["full"] } +tui = "0.19.0" + +[profile.release] +strip = true +lto = true +codegen-units = 1 diff --git a/examples/rs-simple/src/main.rs b/examples/rs-simple/src/main.rs new file mode 100644 index 000000000..647c351c2 --- /dev/null +++ b/examples/rs-simple/src/main.rs @@ -0,0 +1,590 @@ +use bytes::BytesMut; +use crossterm::{ + cursor, + event::{self, KeyCode, KeyEvent}, + ExecutableCommand, + terminal::{self, ClearType}, +}; +use moka::{future::Cache, Expiry}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::{ + fmt, + hash::{Hash, Hasher}, + io::self, + sync::Arc, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, +}; +use tokio::io::AsyncReadExt; +use tokio::sync::mpsc; +use tokio::sync::Mutex; +use tokio::sync::MutexGuard; +use tokio::net::TcpStream; +use tui::{ + backend::CrosstermBackend, + layout::{Layout, Constraint, Direction}, + style::{Style, Color, Modifier}, + Terminal, + widgets::{Block, Borders, List, ListItem, Row, Table, TableState}, +}; + +#[derive(Debug)] +enum ParseError { + Protocol(), + Json(), + Schema(), +} + +impl From<serde_json::Error> for ParseError { + fn from(_: serde_json::Error) -> Self { + ParseError::Json() + } +} + +#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[serde(rename_all = "lowercase")] +enum EventName { + Invalid, New, End, Idle, Update, Analyse, + Guessed, Detected, + #[serde(rename = "detection-update")] + DetectionUpdate, + #[serde(rename = "not-detected")] + NotDetected, +} + +#[derive(Serialize, Deserialize, Copy, Clone, Debug)] +#[serde(rename_all = "lowercase")] +enum State { + Unknown, Info, Finished, +} + +#[derive(Serialize, Deserialize, Debug)] +struct FlowEvent { + #[serde(rename = "flow_event_name")] + name: EventName, + #[serde(rename = "flow_id")] + id: u64, + #[serde(rename = "flow_state")] + state: State, + #[serde(rename = "flow_first_seen")] + first_seen: u64, + #[serde(rename = "flow_src_last_pkt_time")] + src_last_pkt_time: u64, + #[serde(rename = "flow_dst_last_pkt_time")] + dst_last_pkt_time: u64, + #[serde(rename = "flow_idle_time")] + idle_time: u64, + #[serde(rename = "flow_src_packets_processed")] + src_packets_processed: u64, + #[serde(rename = "flow_dst_packets_processed")] + dst_packets_processed: u64, + #[serde(rename = "flow_src_tot_l4_payload_len")] + src_tot_l4_payload_len: u64, + #[serde(rename = "flow_dst_tot_l4_payload_len")] + dst_tot_l4_payload_len: u64, +} + +#[derive(Serialize, Deserialize, Debug)] +struct PacketEvent { + pkt_datalink: u16, + pkt_caplen: u64, + pkt_len: u64, + pkt_l4_len: u64, +} + +#[derive(Debug)] +enum EventType { + Flow(FlowEvent), + Packet(PacketEvent), + Other(), +} + +#[derive(Default)] +struct Stats { + ui_updates: u64, + flow_count: u64, + parse_errors: u64, + events: u64, + flow_events: u64, + packet_events: u64, + total_caplen: u64, + total_len: u64, + total_l4_len: u64, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum Expiration { + FlowIdleTime(u64), +} + +struct FlowExpiry; + +#[derive(Clone, Eq, Default, Debug)] +struct FlowKey { + id: u64, +} + +#[derive(Clone, Debug)] +struct FlowValue { + state: State, + total_src_packets: u64, + total_dst_packets: u64, + total_src_bytes: u64, + total_dst_bytes: u64, + first_seen: std::time::SystemTime, + last_seen: std::time::SystemTime, + timeout_in: std::time::SystemTime, +} + +impl Default for State { + fn default() -> State { + State::Unknown + } +} + +impl fmt::Display for State { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + State::Unknown => write!(f, "N/A"), + State::Info => write!(f, "Info"), + State::Finished => write!(f, "Finished"), + } + } +} + +impl Expiration { + fn as_duration(&self) -> Option<Duration> { + match self { + Expiration::FlowIdleTime(value) => Some(Duration::from_micros(*value)), + } + } +} + +impl fmt::Display for Expiration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.as_duration() { + Some(duration) => { + let secs = duration.as_secs(); + write!(f, "{} s", secs) + } + None => write!(f, "N/A"), + } + } +} + +impl Expiry<FlowKey, (Expiration, FlowValue)> for FlowExpiry { + fn expire_after_create( + &self, + _key: &FlowKey, + value: &(Expiration, FlowValue), + _current_time: Instant, + ) -> Option<Duration> { + value.0.as_duration() + } +} + +impl Hash for FlowKey { + fn hash<H: Hasher>(&self, state: &mut H) { + self.id.hash(state) + } +} + +impl PartialEq for FlowKey { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +#[tokio::main] +async fn main() { + let server_address = "127.0.0.1:7000"; + + let mut stream = match TcpStream::connect(server_address).await { + Ok(stream) => stream, + Err(e) => { + eprintln!("Connection to {} failed: {}", server_address, e); + return; + } + }; + if let Err(e) = terminal::enable_raw_mode() { + eprintln!("Could not enable terminal raw mode: {}", e); + return; + } + let mut stdout = io::stdout(); + if let Err(e) = stdout.execute(terminal::Clear(ClearType::All)) { + eprintln!("Could not clear your terminal: {}", e); + return; + } + if let Err(e) = stdout.execute(cursor::Hide) { + eprintln!("Could not hide your cursor: {}", e); + return; + } + let backend = CrosstermBackend::new(stdout); + let mut terminal = Terminal::new(backend); + + let mut buffer = BytesMut::with_capacity(33792usize); + let (tx, mut rx): (mpsc::Sender<String>, mpsc::Receiver<String>) = mpsc::channel(1024); + let data = Arc::new(Mutex::new(Stats::default())); + let data_tx = Arc::clone(&data); + let data_rx = Arc::clone(&data); + let flow_cache: Arc<Cache<FlowKey, (Expiration, FlowValue)>> = Arc::new(Cache::builder() + .expire_after(FlowExpiry) + .build()); + let flow_cache_rx = Arc::clone(&flow_cache); + + tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + match parse_json(&msg) { + Ok(message) => { + let mut data_lock = data_tx.lock().await; + data_lock.events += 1; + update_stats(&message, &mut data_lock, &flow_cache).await; + } + Err(_message) => { + let mut data_lock = data_tx.lock().await; + data_lock.parse_errors += 1; + } + } + } + }); + tokio::spawn(async move { + loop { + let n = match stream.read_buf(&mut buffer).await { + Ok(len) => len, + Err(_) => { + continue; // Versuche es erneut, wenn ein Fehler auftritt + } + }; + if n == 0 { + break; + } + + while let Some(message) = parse_message(&mut buffer) { + match tx.send(message).await { + Ok(_) => (), + Err(_) => return + } + } + } + }); + + let mut table_state = TableState::default(); + + loop { + let flows: Vec<(FlowKey, (Expiration, FlowValue))> = flow_cache_rx.iter().map(|(k, v)| (k.as_ref().clone(), v.clone())).collect(); + let mut table_selected = match table_state.selected() { + Some(table_index) => { + if flows.len() > 0 && table_index >= flows.len() { + flows.len() - 1 + } else { + table_index + } + } + None => 0, + }; + + match read_keypress() { + Some(KeyCode::Esc) => break, + Some(KeyCode::Char('q')) => break, + Some(KeyCode::Up) => { + table_selected = match table_selected { + _ if flows.len() == 0 => 0, + i if i == 0 => flows.len() - 1, + i => i - 1, + }; + }, + Some(KeyCode::Down) => { + table_selected = match table_selected { + i if flows.len() == 0 || i >= flows.len() - 1 => 0, + i => i + 1, + }; + }, + Some(KeyCode::PageUp) => { + table_selected = match table_selected { + _ if flows.len() == 0 => 0, + i if i == 0 => flows.len() - 1, + i if i < 10 => 0, + i => i - 10, + }; + }, + Some(KeyCode::PageDown) => { + table_selected = match table_selected { + i if flows.len() == 0 || i >= flows.len() - 1 => 0, + i if flows.len() < 10 || i >= flows.len() - 10 => flows.len() - 1, + i => i + 10, + }; + }, + Some(KeyCode::Home) => { + table_selected = 0; + }, + Some(KeyCode::End) => { + table_selected = match table_selected { + _ if flows.len() == 0 => 0, + _ => flows.len() - 1, + }; + }, + Some(_) => (), + None => () + }; + + let mut data_lock = data_rx.lock().await; + data_lock.ui_updates += 1; + draw_ui(terminal.as_mut().unwrap(), &mut table_state, table_selected, &data_lock, &flows); + } + + if let Err(e) = terminal.unwrap().backend_mut().execute(cursor::Show) { + eprintln!("Could not show your cursor: {}", e); + return; + } + let mut stdout = io::stdout(); + if let Err(e) = stdout.execute(terminal::Clear(ClearType::All)) { + eprintln!("Could not clear your terminal: {}", e); + return; + } + if let Err(e) = terminal::disable_raw_mode() { + eprintln!("Could not disable raw mode: {}", e); + return; + } + println!("\nDone."); +} + +fn read_keypress() -> Option<KeyCode> { + if event::poll(Duration::from_millis(500)).unwrap() { + if let event::Event::Key(KeyEvent { code, .. }) = event::read().unwrap() { + return Some(code); + } + } + + None +} + +fn parse_message(buffer: &mut BytesMut) -> Option<String> { + if let Some(pos) = buffer.iter().position(|&b| b == b'\n') { + let message = buffer.split_to(pos + 1); + return Some(String::from_utf8_lossy(&message).to_string()); + } + + None +} + +fn parse_json(data: &str) -> Result<EventType, ParseError> { + let first_non_digit = data.find(|c: char| !c.is_ascii_digit()).unwrap_or(0); + let length_str = &data[0..first_non_digit]; + let length: usize = length_str.parse().unwrap_or(0); + if length == 0 { + return Err(ParseError::Protocol()); + } + + let json_str = &data[first_non_digit..first_non_digit + length]; + let value: Value = serde_json::from_str(json_str).map_err(|_| ParseError::Json()).unwrap(); + if value.get("flow_event_name").is_some() { + let flow_event: FlowEvent = serde_json::from_value(value)?; + return Ok(EventType::Flow(flow_event)); + } else if value.get("packet_event_name").is_some() { + let packet_event: PacketEvent = serde_json::from_value(value)?; + return Ok(EventType::Packet(packet_event)); + } else if value.get("daemon_event_name").is_some() || + value.get("error_event_name").is_some() { + return Ok(EventType::Other()); + } + + Err(ParseError::Schema()) +} + +async fn update_stats(event: &EventType, stats: &mut MutexGuard<'_, Stats>, cache: &Cache<FlowKey, (Expiration, FlowValue)>) { + match &event { + EventType::Flow(flow_event) => { + stats.flow_events += 1; + stats.flow_count = cache.entry_count(); + let key = FlowKey { id: flow_event.id }; + + if flow_event.name == EventName::End || + flow_event.name == EventName::Idle + { + cache.remove(&key).await; + return; + } + + let first_seen_seconds = flow_event.first_seen / 1_000_000; + let first_seen_nanos = (flow_event.first_seen % 1_000_000) * 1_000; + let first_seen_epoch = std::time::Duration::new(first_seen_seconds, first_seen_nanos as u32); + let first_seen_system = UNIX_EPOCH + first_seen_epoch; + + let last_seen = std::cmp::max(flow_event.src_last_pkt_time, + flow_event.dst_last_pkt_time); + let last_seen_seconds = last_seen / 1_000_000; + let last_seen_nanos = (last_seen % 1_000_000) * 1_000; + let last_seen_epoch = std::time::Duration::new(last_seen_seconds, last_seen_nanos as u32); + let last_seen_system = UNIX_EPOCH + last_seen_epoch; + + let timeout_seconds = (last_seen + flow_event.idle_time) / 1_000_000; + let timeout_nanos = ((last_seen + flow_event.idle_time) % 1_000_000) * 1_000; + let timeout_epoch = std::time::Duration::new(timeout_seconds, timeout_nanos as u32); + let timeout_system = UNIX_EPOCH + timeout_epoch; + + let value = FlowValue { + state: flow_event.state, + total_src_packets: flow_event.src_packets_processed, + total_dst_packets: flow_event.dst_packets_processed, + total_src_bytes: flow_event.src_tot_l4_payload_len, + total_dst_bytes: flow_event.dst_tot_l4_payload_len, + first_seen: first_seen_system, + last_seen: last_seen_system, + timeout_in: timeout_system, + }; + cache.insert(key, (Expiration::FlowIdleTime(flow_event.idle_time), value)).await; + } + EventType::Packet(packet_event) => { + stats.packet_events += 1; + stats.total_caplen += packet_event.pkt_caplen; + stats.total_len += packet_event.pkt_len; + stats.total_l4_len += packet_event.pkt_l4_len; + } + EventType::Other() => {} + } +} + +fn format_bytes(bytes: u64) -> String { + const KB: u64 = 1024; + const MB: u64 = KB * 1024; + const GB: u64 = MB * 1024; + + if bytes >= GB { + format!("{} GB", bytes / GB) + } else if bytes >= MB { + format!("{} MB", bytes / MB) + } else if bytes >= KB { + format!("{} kB", bytes / KB) + } else { + format!("{} B", bytes) + } +} + +fn draw_ui<B: tui::backend::Backend>(terminal: &mut Terminal<B>, table_state: &mut TableState, table_selected: usize, data: &MutexGuard<Stats>, flows: &Vec<(FlowKey, (Expiration, FlowValue))>) { + let general_items = vec![ + ListItem::new("TUI Updates..: ".to_owned() + &data.ui_updates.to_string()), + ListItem::new("Flows Cached.: ".to_owned() + &data.flow_count.to_string()), + ListItem::new("Total Events.: ".to_owned() + &data.events.to_string()), + ListItem::new("Parse Errors.: ".to_owned() + &data.parse_errors.to_string()), + ListItem::new("Flow Events..: ".to_owned() + &data.flow_events.to_string()), + ListItem::new("Packet Events: ".to_owned() + &data.packet_events.to_string()), + ]; + let packet_items = vec![ + ListItem::new("Total Capture Length: ".to_owned() + &format_bytes(data.total_caplen)), + ListItem::new("Total Length........: ".to_owned() + &format_bytes(data.total_len)), + ListItem::new("Total L4 Length.....: ".to_owned() + &format_bytes(data.total_l4_len)), + ]; + let table_rows: Vec<Row> = flows + .into_iter() + .map(|(key, (_exp, val))| { + let first_seen_display = match val.first_seen.elapsed() { + Ok(elapsed) => { + match elapsed.as_secs() { + t if t > (3_600 * 24) => format!("{} d ago", t / (3_600 * 24)), + t if t > 3_600 => format!("{} h ago", t / 3_600), + t if t > 60 => format!("{} min ago", t / 60), + t if t > 0 => format!("{} s ago", t), + t if t == 0 => "< 1 s ago".to_string(), + t => format!("INVALID: {}", t), + } + } + Err(err) => format!("ERROR: {}", err) + }; + + let last_seen_display = match val.last_seen.elapsed() { + Ok(elapsed) => { + match elapsed.as_secs() { + t if t > (3_600 * 24) => format!("{} d ago", t / (3_600 * 24)), + t if t > 3_600 => format!("{} h ago", t / 3_600), + t if t > 60 => format!("{} min ago", t / 60), + t if t > 0 => format!("{} s ago", t), + t if t == 0 => "< 1 s ago".to_string(), + t => format!("INVALID: {}", t), + } + } + Err(_err) => "ERROR".to_string() + }; + + let timeout_display = match val.timeout_in.duration_since(SystemTime::now()) { + Ok(elapsed) => { + match elapsed.as_secs() { + t if t > (3_600 * 24) => format!("in {} d", t / (3_600 * 24)), + t if t > 3_600 => format!("in {} h", t / 3_600), + t if t > 60 => format!("in {} min", t / 60), + t if t > 0 => format!("in {} s", t), + t if t == 0 => "in < 1 s".to_string(), + t => format!("INVALID: {}", t), + } + } + Err(_err) => "EXPIRED".to_string() + }; + + Row::new(vec![ + key.id.to_string(), + val.state.to_string(), + first_seen_display, + last_seen_display, + timeout_display, + (val.total_src_packets + val.total_dst_packets).to_string(), + format_bytes(val.total_src_bytes + val.total_dst_bytes), + ]) + }) + .collect(); + + terminal.draw(|f| { + let size = f.size(); + + let chunks = Layout::default() + .direction(Direction::Vertical) + .constraints( + [ + Constraint::Percentage(18), + Constraint::Percentage(82), + ].as_ref() + ) + .split(size); + + let top_chunks = Layout::default() + .direction(Direction::Horizontal) + .constraints( + [ + Constraint::Percentage(50), + Constraint::Percentage(50), + ].as_ref() + ) + .split(chunks[0]); + + let table_selected_abs = match table_selected { + _ if flows.len() == 0 => 0, + i => i + 1, + }; + let table = Table::new(table_rows) + .header(Row::new(vec!["Flow ID", "State", "First Seen", "Last Seen", "Timeout", "Total Packets", "Total Bytes"]) + .style(Style::default().fg(Color::Yellow).add_modifier(Modifier::BOLD))) + .block(Block::default().title("Flow Table (selected: ".to_string() + + &table_selected_abs.to_string() + + "): " + + &flows.len().to_string() + + " item(s)").borders(Borders::ALL)) + .highlight_style(Style::default().bg(Color::Blue)) + .widths(&[ + Constraint::Length(20), + Constraint::Length(20), + Constraint::Length(20), + Constraint::Length(20), + Constraint::Length(20), + Constraint::Length(20), + Constraint::Length(20), + ]); + + let general_list = List::new(general_items) + .block(Block::default().title("General").borders(Borders::ALL)); + let packet_list = List::new(packet_items) + .block(Block::default().title("Packet Events").borders(Borders::ALL)); + + table_state.select(Some(table_selected)); + f.render_widget(general_list, top_chunks[0]); + f.render_widget(packet_list, top_chunks[1]); + f.render_stateful_widget(table, chunks[1], table_state); + }).unwrap(); +} |