diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2025-05-14 12:36:38 +0200 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2025-05-14 12:36:38 +0200 |
commit | 07d60181090b6dd24f8a809769be4902b2f8f4c9 (patch) | |
tree | 501a0a113c188c059498993df7c718feb58df4b4 | |
parent | dd909adeb846a6e609aba8a651f1e43dd6b56874 (diff) |
rs-simple: make primitive flow table work
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
-rw-r--r-- | examples/rs-simple/src/main.rs | 176 |
1 files changed, 154 insertions, 22 deletions
diff --git a/examples/rs-simple/src/main.rs b/examples/rs-simple/src/main.rs index 1647c5c70..c572f7e94 100644 --- a/examples/rs-simple/src/main.rs +++ b/examples/rs-simple/src/main.rs @@ -9,10 +9,11 @@ use moka::{future::Cache, Expiry}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::{ + fmt, hash::{Hash, Hasher}, io::self, sync::Arc, - time::{Duration, Instant}, + time::{Duration, Instant, UNIX_EPOCH}, }; use tokio::io::AsyncReadExt; use tokio::sync::mpsc; @@ -22,9 +23,9 @@ use tokio::net::TcpStream; use tui::{ backend::CrosstermBackend, layout::{Layout, Constraint, Direction}, - style::{Style, Color}, + style::{Style, Color, Modifier}, Terminal, - widgets::{Block, Borders, List, ListItem}, + widgets::{Block, Borders, List, ListItem, Row, Table, TableState}, }; #[derive(Debug)] @@ -51,10 +52,10 @@ enum EventName { NotDetected, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Copy, Clone, Debug)] #[serde(rename_all = "lowercase")] enum State { - Info, Finished, + Unknown, Info, Finished, } #[derive(Serialize, Deserialize, Debug)] @@ -65,6 +66,12 @@ struct FlowEvent { id: u64, #[serde(rename = "flow_state")] state: State, + #[serde(rename = "flow_first_seen")] + first_seen: u64, + #[serde(rename = "flow_src_last_pkt_time")] + src_last_pkt_time: u64, + #[serde(rename = "flow_dst_last_pkt_time")] + dst_last_pkt_time: u64, #[serde(rename = "flow_idle_time")] idle_time: u64, #[serde(rename = "flow_src_packets_processed")] @@ -112,13 +119,35 @@ enum Expiration { struct FlowExpiry; -#[derive(Eq, Default, Debug)] +#[derive(Clone, Eq, Default, Debug)] struct FlowKey { id: u64, } -#[derive(Clone, Default, Debug)] +#[derive(Clone, Debug)] struct FlowValue { + state: State, + total_src_packets: u64, + total_dst_packets: u64, + total_src_bytes: u64, + total_dst_bytes: u64, + first_seen: (u64, u64, u64, u64), +} + +impl Default for State { + fn default() -> State { + State::Unknown + } +} + +impl fmt::Display for State { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + State::Unknown => write!(f, "N/A"), + State::Info => write!(f, "Info"), + State::Finished => write!(f, "Finished"), + } + } } impl Expiration { @@ -129,6 +158,18 @@ impl Expiration { } } +impl fmt::Display for Expiration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.as_duration() { + Some(duration) => { + let secs = duration.as_secs(); + write!(f, "{} s", secs) + } + None => write!(f, "N/A"), + } + } +} + impl Expiry<FlowKey, (Expiration, FlowValue)> for FlowExpiry { fn expire_after_create( &self, @@ -184,13 +225,12 @@ async fn main() { let data = Arc::new(Mutex::new(Stats::default())); let data_tx = Arc::clone(&data); let data_rx = Arc::clone(&data); + let flow_cache: Arc<Cache<FlowKey, (Expiration, FlowValue)>> = Arc::new(Cache::builder() + .expire_after(FlowExpiry) + .build()); + let flow_cache_rx = Arc::clone(&flow_cache); tokio::spawn(async move { - let expiry = FlowExpiry; - let flow_cache: Cache<FlowKey, (Expiration, FlowValue)> = Cache::builder() - .expire_after(expiry) - .build(); - while let Some(msg) = rx.recv().await { match parse_json(&msg) { Ok(message) => { @@ -226,16 +266,45 @@ async fn main() { } }); + let mut table_state = TableState::default(); + loop { + let flows: Vec<(FlowKey, (Expiration, FlowValue))> = flow_cache_rx.iter().map(|(k, v)| (k.as_ref().clone(), v.clone())).collect(); + let mut table_selected = match table_state.selected() { + Some(table_index) => { + if flows.len() > 0 && table_index >= flows.len() { + flows.len() - 1 + } else { + table_index + } + } + None => 0, + }; + match read_keypress() { Some(KeyCode::Esc) => break, Some(KeyCode::Char('q')) => break, + Some(KeyCode::Up) => { + table_selected = match table_selected { + i if i == 0 && flows.len() == 0 => 0, + i if i == 0 => flows.len() - 1, + i => i - 1, + }; + }, + Some(KeyCode::Down) => { + table_selected = match table_selected { + i if flows.len() == 0 || i >= flows.len() - 1 => 0, + i => i + 1, + }; + }, + Some(KeyCode::Enter) => break, Some(_) => (), None => () }; + let mut data_lock = data_rx.lock().await; data_lock.ui_updates += 1; - draw_ui(terminal.as_mut().unwrap(), &data_lock); + draw_ui(terminal.as_mut().unwrap(), &mut table_state, table_selected, &data_lock, &flows); } if let Err(e) = terminal.unwrap().backend_mut().execute(cursor::Show) { @@ -303,8 +372,30 @@ async fn update_stats(event: &EventType, stats: &mut MutexGuard<'_, Stats>, cach stats.flow_events += 1; stats.flow_count = cache.entry_count(); + let first_seen_seconds = flow_event.first_seen / 1_000_000; + let first_seen_nanos = (flow_event.first_seen % 1_000_000) * 1_000; + let first_seen_epoch = std::time::Duration::new(first_seen_seconds, first_seen_nanos as u32); + let first_seen_system = UNIX_EPOCH + first_seen_epoch; + let time_tuple = match first_seen_system.elapsed() { + Ok(elapsed) => { + let seconds = elapsed.as_secs(); + let minutes = seconds / 60; + let hours = minutes / 60; + let days = hours / 24; + (seconds, minutes, hours, days) + } + Err(_) => (0, 0, 0, 0) + }; + let key = FlowKey { id: flow_event.id }; - let value = FlowValue {}; + let value = FlowValue { + state: flow_event.state, + total_src_packets: flow_event.src_packets_processed, + total_dst_packets: flow_event.dst_packets_processed, + total_src_bytes: flow_event.src_tot_l4_payload_len, + total_dst_bytes: flow_event.dst_tot_l4_payload_len, + first_seen: time_tuple, + }; cache.insert(key, (Expiration::FlowIdleTime(flow_event.idle_time), value)).await; } EventType::Packet(packet_event) => { @@ -317,10 +408,10 @@ async fn update_stats(event: &EventType, stats: &mut MutexGuard<'_, Stats>, cach } } -fn draw_ui<B: tui::backend::Backend>(terminal: &mut Terminal<B>, data: &MutexGuard<Stats>) { +fn draw_ui<B: tui::backend::Backend>(terminal: &mut Terminal<B>, table_state: &mut TableState, table_selected: usize, data: &MutexGuard<Stats>, flows: &Vec<(FlowKey, (Expiration, FlowValue))>) { let general_items = vec![ ListItem::new("TUI Updates..: ".to_owned() + &data.ui_updates.to_string()), - ListItem::new("Flow Count...: ".to_owned() + &data.flow_count.to_string()), + ListItem::new("Flows Cached.: ".to_owned() + &data.flow_count.to_string()), ListItem::new("Total Events.: ".to_owned() + &data.events.to_string()), ListItem::new("Parse Errors.: ".to_owned() + &data.parse_errors.to_string()), ListItem::new("Flow Events..: ".to_owned() + &data.flow_events.to_string()), @@ -331,6 +422,29 @@ fn draw_ui<B: tui::backend::Backend>(terminal: &mut Terminal<B>, data: &MutexGua ListItem::new("Total Length........: ".to_owned() + &data.total_len.to_string()), ListItem::new("Total L4 Length.....: ".to_owned() + &data.total_l4_len.to_string()), ]; + let table_rows: Vec<Row> = flows + .into_iter() + .map(|(key, (exp, val))| { + let first_seen_display = match (val.first_seen.0, val.first_seen.1, + val.first_seen.2, val.first_seen.3) + { + (_, _, _, d) if d > 0 => format!("{} day(s) ago", d), + (_, _, h, _) if h > 0 => format!("{} hour(s) ago", h), + (_, m, _, _) if m > 0 => format!("{} min(s) ago", m), + (s, _, _, _) if s > 0 => format!("{} sec(s) ago", s), + _ => format!("{} sec(s) ago", val.first_seen.0), + }; + + Row::new(vec![ + key.id.to_string(), + val.state.to_string(), + first_seen_display, + exp.to_string(), + (val.total_src_packets + val.total_dst_packets).to_string(), + (val.total_src_bytes + val.total_dst_bytes).to_string(), + ]) + }) + .collect(); terminal.draw(|f| { let size = f.size(); @@ -355,18 +469,36 @@ fn draw_ui<B: tui::backend::Backend>(terminal: &mut Terminal<B>, data: &MutexGua ) .split(chunks[0]); - let right_block = Block::default() - .title("Unused Bottom Panel") - .borders(Borders::ALL) - .style(Style::default().fg(Color::Black).bg(Color::Green)); + let table_selected_abs = match table_selected { + _ if flows.len() == 0 => 0, + i => i + 1, + }; + let table = Table::new(table_rows) + .header(Row::new(vec!["Flow ID", "State", "First Seen", "Timeout", "Total Packets", "Total Bytes"]) + .style(Style::default().fg(Color::Yellow).add_modifier(Modifier::BOLD))) + .block(Block::default().title("Flow Table (selected: ".to_string() + + &table_selected_abs.to_string() + + "): " + + &flows.len().to_string() + + " item(s)").borders(Borders::ALL)) + .highlight_style(Style::default().bg(Color::Blue)) + .widths(&[ + Constraint::Length(20), + Constraint::Length(20), + Constraint::Length(20), + Constraint::Length(20), + Constraint::Length(20), + Constraint::Length(20), + ]); let general_list = List::new(general_items) .block(Block::default().title("General").borders(Borders::ALL)); let packet_list = List::new(packet_items) - .block(Block::default().title("Packet").borders(Borders::ALL)); + .block(Block::default().title("Packet Events").borders(Borders::ALL)); + table_state.select(Some(table_selected)); f.render_widget(general_list, top_chunks[0]); f.render_widget(packet_list, top_chunks[1]); - f.render_widget(right_block, chunks[1]); + f.render_stateful_widget(table, chunks[1], table_state); }).unwrap(); } |