@@ -20,6 +20,7 @@ use crate::{
20
20
types:: { Args , UDFlowKey } ,
21
21
utils:: { cur_time_file, fluere_exporter} ,
22
22
FluereError ,
23
+ error:: OptionExt ,
23
24
} ;
24
25
25
26
use fluere_config:: Config ;
@@ -37,14 +38,14 @@ use tokio::{task, task::JoinHandle};
37
38
// It takes the command line arguments as input, which specify the network interface to capture from and other parameters.
38
39
// The function runs indefinitely, capturing packets and exporting the captured data to a CSV file.
39
40
pub async fn packet_capture ( arg : Args ) -> Result < ( ) , FluereError > {
40
- let csv_file = arg. files . csv . expect ( "this should be defaulted to `output` on construction" ) ;
41
+ let csv_file = arg. files . csv . required ( "this should be defaulted to `output` on construction" ) ? ;
41
42
//let enable_ipv6
42
- let use_mac = arg. parameters . use_mac . expect ( "this should be defaulted to `false` on construction" ) ;
43
- let interface_name = arg. interface . expect ( "interface not found" ) ;
44
- let duration = arg. parameters . duration . expect ( "this should be defaulted to `0(infinite)` on construction" ) ;
45
- let interval = arg. parameters . interval . expect ( "this should be defaulted to `30 minutes` on construction" ) ;
46
- let flow_timeout = arg. parameters . timeout . expect ( "this should be defaulted to `10 minutes` on construction" ) ;
47
- let _sleep_windows = arg. parameters . sleep_windows . expect ( "this should be defaulted to `false`, and now deprecated" ) ;
43
+ let use_mac = arg. parameters . use_mac . required ( "this should be defaulted to `false` on construction" ) ? ;
44
+ let interface_name = arg. interface . required ( "interface should be provided" ) ? ;
45
+ let duration = arg. parameters . duration . required ( "this should be defaulted to `0(infinite)` on construction" ) ? ;
46
+ let interval = arg. parameters . interval . required ( "this should be defaulted to `30 minutes` on construction" ) ? ;
47
+ let flow_timeout = arg. parameters . timeout . required ( "this should be defaulted to `10 minutes` on construction" ) ? ;
48
+ let _sleep_windows = arg. parameters . sleep_windows . required ( "this should be defaulted to `false`, and now deprecated" ) ? ;
48
49
let config = Config :: new ( ) ;
49
50
let plugin_manager = PluginManager :: new ( ) . expect ( "Failed to create plugin manager" ) ;
50
51
let plugin_worker = plugin_manager. start_worker ( ) ;
@@ -81,7 +82,10 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
81
82
82
83
loop {
83
84
match cap. next_packet ( ) {
84
- Err ( _) => continue ,
85
+ Err ( e) => {
86
+ trace ! ( "Error capturing packet: {}" , e) ;
87
+ continue ;
88
+ }
85
89
Ok ( packet) => {
86
90
trace ! ( "received packet" ) ;
87
91
@@ -172,11 +176,11 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
172
176
trace ! ( "flow key detail: {:?}" , flow_key) ;
173
177
174
178
// Check if the flow has finished
175
- if flags. fin == 1 || flags . rst == 1 {
179
+ if flags. is_finished ( ) {
176
180
trace ! ( "flow finished" ) ;
177
181
trace ! ( "flow data: {:?}" , flow) ;
178
182
179
- plugin_manager. process_flow_data ( * flow) . await . unwrap ( ) ;
183
+ plugin_manager. process_flow_data ( * flow) . await . map_err ( |e| FluereError :: PluginError ( e . to_string ( ) ) ) ? ;
180
184
records. push ( * flow) ;
181
185
182
186
active_flow. remove ( flow_key) ;
@@ -203,7 +207,7 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
203
207
for key in keys {
204
208
if let Some ( flow) = active_flow. remove ( & key) {
205
209
trace ! ( "flow expired" ) ;
206
- plugin_manager. process_flow_data ( flow) . await . unwrap ( ) ;
210
+ plugin_manager. process_flow_data ( flow) . await . map_err ( |e| FluereError :: PluginError ( e . to_string ( ) ) ) ? ;
207
211
records. push ( flow) ;
208
212
}
209
213
}
@@ -246,15 +250,15 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
246
250
for ( _exp_time, keys) in flow_expirations. iter ( ) {
247
251
for key in keys {
248
252
if let Some ( flow) = active_flow. remove ( key) {
249
- plugin_manager. process_flow_data ( flow) . await . unwrap ( ) ;
253
+ plugin_manager. process_flow_data ( flow) . await . map_err ( |e| FluereError :: PluginError ( e . to_string ( ) ) ) ? ;
250
254
records. push ( flow) ;
251
255
}
252
256
}
253
257
}
254
258
255
259
debug ! ( "Captured in {:?}" , start. elapsed( ) ) ;
256
260
for ( _key, flow) in active_flow. iter ( ) {
257
- plugin_manager. process_flow_data ( * flow) . await . unwrap ( ) ;
261
+ plugin_manager. process_flow_data ( * flow) . await . map_err ( |e| FluereError :: PluginError ( e . to_string ( ) ) ) ? ;
258
262
records. push ( * flow) ;
259
263
}
260
264
for task in tasks {
0 commit comments