Skip to content

Commit

Permalink
fix: break fragment chain when dropping the last ones
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Nov 1, 2024
1 parent bb24e95 commit a1a0192
Showing 1 changed file with 24 additions and 3 deletions.
27 changes: 24 additions & 3 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ use zenoh_codec::{transport::batch::BatchError, WCodec, Zenoh080};
use zenoh_config::QueueSizeConf;
use zenoh_core::zlock;
use zenoh_protocol::{
common::ZExtBody,
core::Priority,
network::NetworkMessage,
network::{NetworkBody, NetworkMessage, Oam},
transport::{
fragment::FragmentHeader,
frame::{self, FrameHeader},
Expand Down Expand Up @@ -100,7 +101,7 @@ impl StageInOut {

#[inline]
fn move_batch(&mut self, batch: WBatch) {
let _ = self.s_out_w.push(batch);
assert!(self.s_out_w.push(batch).is_none());
self.atomic_backoff.bytes.store(0, Ordering::Relaxed);
let _ = self.n_out_w.notify();
}
Expand Down Expand Up @@ -202,6 +203,7 @@ struct StageIn {
mutex: StageInMutex,
fragbuf: ZBuf,
batching: bool,
fragments_dropped: bool,
}

impl StageIn {
Expand All @@ -211,6 +213,24 @@ impl StageIn {
priority: Priority,
deadline: &mut Deadline,
) -> bool {
if self.fragments_dropped {
let mut stop_fragments = NetworkMessage {
body: NetworkBody::OAM(Oam {
id: u16::MAX,
body: ZExtBody::Unit,
ext_qos: Default::default(),
ext_tstamp: None,
}),
reliability: msg.reliability,
#[cfg(feature = "stats")]
size: None,
};
self.fragments_dropped = false;
if !self.push_network_message(&mut stop_fragments, priority, deadline) {
self.fragments_dropped = true;
return false;
}
}
// Lock the current serialization batch.
let mut c_guard = self.mutex.current();

Expand Down Expand Up @@ -322,7 +342,7 @@ impl StageIn {
let mut reader = self.fragbuf.reader();
while reader.can_read() {
// Get the current serialization batch
batch = zgetbatch_rets!(tch.sn.set(sn).unwrap());
batch = zgetbatch_rets!(self.fragments_dropped = true);

// Serialize the message fragment
match batch.encode((&mut reader, &mut fragment)) {
Expand Down Expand Up @@ -640,6 +660,7 @@ impl TransmissionPipeline {
},
fragbuf: ZBuf::empty(),
batching: config.batching_enabled,
fragments_dropped: false,
}));

// The stage out for this priority
Expand Down

0 comments on commit a1a0192

Please sign in to comment.