Skip to content

Commit 6ec83d1

Browse files
committed
refactor: improve minor error handling #22
1 parent 0e0a33b commit 6ec83d1

File tree

4 files changed

+55
-36
lines changed

4 files changed

+55
-36
lines changed

src/main.rs

+23-13
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ pub mod types;
1111
pub mod utils;
1212

1313
use std::fs::File;
14-
use std::{fmt::Display, process::exit};
14+
use std::{fmt::Display, io, process::exit};
1515

1616
use crate::logger::{Logger, Logstdout};
17-
use crate::net::DeviceError;
17+
use crate::net::NetError;
1818
// use env_logger;::{init, Logger};
1919

2020
use log::{debug, Level, LevelFilter};
@@ -28,12 +28,12 @@ use log::{debug, Level, LevelFilter};
2828
// };
2929

3030
#[derive(Debug)]
31-
enum FluereError {
31+
pub enum FluereError {
3232
InterfaceNotFound,
33-
DeviceError(DeviceError),
3433
ArgumentParseError(String),
3534
ModeNotSupported(String),
36-
NetworkError(String),
35+
NetworkError(NetError),
36+
IoError(io::Error),
3737
}
3838

3939
impl std::fmt::Display for FluereError {
@@ -42,17 +42,23 @@ impl std::fmt::Display for FluereError {
4242
FluereError::InterfaceNotFound => write!(f, "Network interface not found."),
4343
FluereError::ArgumentParseError(msg) => write!(f, "Argument parsing error: {}", msg),
4444
FluereError::ModeNotSupported(mode) => write!(f, "Mode not supported: {}", mode),
45-
FluereError::NetworkError(msg) => write!(f, "Network error: {}", msg),
46-
FluereError::DeviceError(err) => err.fmt(f),
45+
FluereError::NetworkError(err) => err.fmt(f),
46+
FluereError::IoError(err) => err.fmt(f),
4747
}
4848
}
4949
}
5050

5151
impl std::error::Error for FluereError {}
5252

53-
impl From<DeviceError> for FluereError {
54-
fn from(err: DeviceError) -> Self {
55-
FluereError::DeviceError(err)
53+
impl From<NetError> for FluereError {
54+
fn from(err: NetError) -> Self {
55+
FluereError::NetworkError(err)
56+
}
57+
}
58+
59+
impl From<io::Error> for FluereError {
60+
fn from(err: io::Error) -> Self {
61+
FluereError::IoError(err)
5662
}
5763
}
5864

@@ -119,11 +125,15 @@ async fn main() {
119125
debug!("Fluere started");
120126

121127
match mode_type {
122-
Mode::Online => net::online_fluereflow::packet_capture(parems.0).await,
123-
Mode::Offline => net::fluereflow_fileparse(parems.0).await,
128+
Mode::Online => net::online_fluereflow::packet_capture(parems.0)
129+
.await
130+
.expect("Online mode failed"),
131+
Mode::Offline => net::fluereflow_fileparse(parems.0)
132+
.await
133+
.expect("Offline mode failed"),
124134
Mode::Live => net::live_fluereflow::packet_capture(parems.0)
125135
.await
126-
.expect("Error on live mode"),
136+
.expect("Live mode failed"),
127137
Mode::Pcap => net::pcap_capture(parems.0).await,
128138
}
129139
} else {

src/net/live_fluereflow.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::{
1111
},
1212
types::{Args, UDFlowKey},
1313
utils::{cur_time_file, fluere_exporter},
14+
FluereError,
1415
};
1516
use std::{
1617
collections::HashMap,
@@ -43,7 +44,7 @@ const MAX_RECENT_FLOWS: usize = 50;
4344
// This function is the entry point for the live packet capture functionality.
4445
// It takes the command line arguments as input and calls the online_packet_capture function.
4546
// It returns a Result indicating whether the operation was successful.
46-
pub async fn packet_capture(arg: Args) -> Result<(), io::Error> {
47+
pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
4748
debug!("Starting Terminal User Interface");
4849

4950
online_packet_capture(arg).await;

src/net/offline_fluereflows.rs

+24-19
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,21 @@ use crate::{
88
},
99
types::{Args, UDFlowKey},
1010
utils::{cur_time_file, fluere_exporter},
11+
FluereError, NetError,
1112
};
1213

1314
use fluereflow::FluereRecord;
1415
use log::{debug, info, trace};
1516
use pcap::Capture;
1617
use tokio::task;
1718

18-
pub async fn fluereflow_fileparse(arg: Args) {
19+
pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> {
1920
let csv_file = arg.files.csv.unwrap();
2021
let file_name = arg.files.file.unwrap();
2122
let use_mac = arg.parameters.use_mac.unwrap();
22-
let _flow_timeout = arg.parameters.timeout.unwrap();
23+
let flow_timeout = arg.parameters.timeout.unwrap();
2324

24-
let mut cap = Capture::from_file(file_name).unwrap();
25+
let mut cap = Capture::from_file(file_name).map_err(NetError::from)?;
2526

2627
let file_dir = "./output";
2728
match fs::create_dir_all(<&str>::clone(&file_dir)) {
@@ -82,21 +83,6 @@ pub async fn fluereflow_fileparse(arg: Args) {
8283
},
8384
Some(_) => false,
8485
};
85-
/*let is_reverse = if active_flow.contains_key(&key_value) {
86-
false
87-
} else if active_flow.contains_key(&reverse_key) {
88-
true
89-
} else {
90-
if flowdata.get_prot() != 6 && flags.syn > 0 {
91-
active_flow.insert(key_value, flowdata);
92-
if verbose >= 2 {
93-
println!("flow established");
94-
}
95-
} else {
96-
continue;
97-
}
98-
false
99-
};*/
10086

10187
let time = parse_microseconds(
10288
packet.header.ts.tv_sec as u64,
@@ -129,8 +115,26 @@ pub async fn fluereflow_fileparse(arg: Args) {
129115
active_flow.remove(flow_key);
130116
}
131117
}
118+
119+
// Before processing a new packet, check for and handle expired flows
120+
let mut expired_flows = Vec::new();
121+
for (key, flow) in active_flow.iter() {
122+
if flow_timeout > 0 && time > (flow.last + (flow_timeout * 1000)) {
123+
// Assuming flow.last is in microseconds
124+
trace!("flow expired");
125+
trace!("flow data: {:?}", flow);
126+
records.push(*flow);
127+
expired_flows.push(*key);
128+
}
129+
}
130+
131+
// Remove expired flows from the active flows map
132+
// active_flow.retain(|key, _| !expired_flows.contains(key));
133+
for key in expired_flows {
134+
active_flow.remove(&key);
135+
}
132136
}
133-
info!("Captured in {:?}", start.elapsed());
137+
info!("Converted in {:?}", start.elapsed());
134138
let ac_flow_cnt = active_flow.len();
135139
let ended_flow_cnt = records.len();
136140

@@ -146,4 +150,5 @@ pub async fn fluereflow_fileparse(arg: Args) {
146150

147151
info!("Active flow {:?}", ac_flow_cnt);
148152
info!("Ended flow {:?}", ended_flow_cnt);
153+
Ok(())
149154
}

src/net/online_fluereflow.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::{
1818
},
1919
types::{Args, UDFlowKey},
2020
utils::{cur_time_file, fluere_exporter},
21+
FluereError, NetError,
2122
};
2223

2324
use fluere_config::Config;
@@ -35,7 +36,7 @@ use log::{debug, info, trace};
3536
// This function captures packets from a network interface and converts them into NetFlow data.
3637
// It takes the command line arguments as input, which specify the network interface to capture from and other parameters.
3738
// The function runs indefinitely, capturing packets and exporting the captured data to a CSV file.
38-
pub async fn packet_capture(arg: Args) {
39+
pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
3940
let csv_file = arg.files.csv.unwrap();
4041
let use_mac = arg.parameters.use_mac.unwrap();
4142
let interface_name = arg.interface.expect("interface not found");
@@ -52,8 +53,8 @@ pub async fn packet_capture(arg: Args) {
5253
.await
5354
.expect("Failed to load plugins");
5455

55-
let interface = find_device(interface_name.as_str()).unwrap();
56-
let mut cap_device = CaptureDevice::new(interface.clone()).unwrap();
56+
let interface = find_device(interface_name.as_str())?;
57+
let mut cap_device = CaptureDevice::new(interface.clone()).map_err(NetError::from)?;
5758
let cap = &mut cap_device.capture;
5859

5960
let file_dir = "./output";
@@ -252,4 +253,6 @@ pub async fn packet_capture(arg: Args) {
252253
drop(plugin_manager);
253254
let result = tasks.await;
254255
info!("Exporting task excutation result: {:?}", result);
256+
257+
Ok(())
255258
}

0 commit comments

Comments
 (0)