= @@ -25,6 +27,9 @@ type HeadersQueue
= type KnownHeaders
= BTreeMap<
::Number, HashMap<
::Hash, HeaderStatus>>;
+/// We're trying to fetch completion data for single header at this interval.
+const RETRY_FETCH_COMPLETION_INTERVAL: Duration = Duration::from_secs(20);
+
/// Ethereum headers queue.
#[derive(Debug)]
pub struct QueuedHeaders ,
/// Headers that are ready to be submitted to target node.
ready: HeadersQueue ,
+ /// Headers that are ready to be submitted to target node, but their ancestor is incomplete.
+ /// Thus we're waiting for these ancestors to be completed first.
+ /// Note that the incomplete header itself is synced and it isn't in this queue.
+ incomplete: HeadersQueue ,
/// Headers that are (we believe) currently submitted to target node by our,
/// not-yet mined transactions.
submitted: HeadersQueue ,
/// Pointers to all headers that we ever seen and we believe we can touch in the future.
known_headers: KnownHeaders ,
+ /// Headers that are waiting for completion data from source node. Mapped (and auto-sorted
+ /// by) to the last fetch time.
+ incomplete_headers: LinkedHashMap {
/// Returns new QueuedHeaders.
pub fn new() -> Self {
@@ -62,8 +85,11 @@ impl {
maybe_extra: HeadersQueue::new(),
extra: HeadersQueue::new(),
ready: HeadersQueue::new(),
+ incomplete: HeadersQueue::new(),
submitted: HeadersQueue::new(),
known_headers: KnownHeaders:: ::new(),
+ incomplete_headers: LinkedHashMap::new(),
+ completion_data: LinkedHashMap::new(),
prune_border: Zero::zero(),
}
}
@@ -89,6 +115,7 @@ impl {
.fold(0, |total, headers| total + headers.len()),
HeaderStatus::Extra => self.extra.values().fold(0, |total, headers| total + headers.len()),
HeaderStatus::Ready => self.ready.values().fold(0, |total, headers| total + headers.len()),
+ HeaderStatus::Incomplete => self.incomplete.values().fold(0, |total, headers| total + headers.len()),
HeaderStatus::Submitted => self.submitted.values().fold(0, |total, headers| total + headers.len()),
}
}
@@ -105,6 +132,7 @@ impl {
.fold(0, |total, headers| total + headers.len())
+ self.extra.values().fold(0, |total, headers| total + headers.len())
+ self.ready.values().fold(0, |total, headers| total + headers.len())
+ + self.incomplete.values().fold(0, |total, headers| total + headers.len())
}
/// Returns number of best block in the queue.
@@ -119,7 +147,10 @@ impl {
self.extra.keys().next_back().cloned().unwrap_or_else(Zero::zero),
std::cmp::max(
self.ready.keys().next_back().cloned().unwrap_or_else(Zero::zero),
- self.submitted.keys().next_back().cloned().unwrap_or_else(Zero::zero),
+ std::cmp::max(
+ self.incomplete.keys().next_back().cloned().unwrap_or_else(Zero::zero),
+ self.submitted.keys().next_back().cloned().unwrap_or_else(Zero::zero),
+ ),
),
),
),
@@ -145,6 +176,7 @@ impl {
HeaderStatus::MaybeExtra => oldest_header(&self.maybe_extra),
HeaderStatus::Extra => oldest_header(&self.extra),
HeaderStatus::Ready => oldest_header(&self.ready),
+ HeaderStatus::Incomplete => oldest_header(&self.incomplete),
HeaderStatus::Submitted => oldest_header(&self.submitted),
}
}
@@ -162,6 +194,7 @@ impl {
HeaderStatus::MaybeExtra => oldest_headers(&self.maybe_extra, f),
HeaderStatus::Extra => oldest_headers(&self.extra, f),
HeaderStatus::Ready => oldest_headers(&self.ready, f),
+ HeaderStatus::Incomplete => oldest_headers(&self.incomplete, f),
HeaderStatus::Submitted => oldest_headers(&self.submitted, f),
}
}
@@ -207,6 +240,7 @@ impl {
HeaderStatus::MaybeExtra
| HeaderStatus::Extra
| HeaderStatus::Ready
+ | HeaderStatus::Incomplete
| HeaderStatus::Submitted
| HeaderStatus::Synced => {
insert_header(&mut self.maybe_extra, id, header);
@@ -226,67 +260,7 @@ impl {
/// Receive best header from the target node.
pub fn target_best_header_response(&mut self, id: &HeaderId (
- &mut [&mut self.maybe_orphan, &mut self.orphan],
- &mut self.maybe_extra,
- &mut self.known_headers,
- HeaderStatus::MaybeExtra,
- id,
- );
+ self.header_synced(id)
}
/// Receive target node response for MaybeOrphan request.
@@ -315,6 +289,8 @@ impl {
pub fn maybe_extra_response(&mut self, id: &HeaderId {
/// Receive extra from source node.
pub fn extra_response(&mut self, id: &HeaderId {
}
}
- /// Prune and never accep headers before this block.
+ /// When header completion data is sent to target node.
+ pub fn header_completed(&mut self, id: &HeaderId (
+ &mut [&mut self.ready, &mut self.submitted],
+ &mut self.incomplete,
+ &mut self.known_headers,
+ HeaderStatus::Incomplete,
+ &new_incomplete_header,
+ );
+
+ log::debug!(
+ target: "bridge",
+ "Scheduling completion data retrieval for header: {:?}",
+ new_incomplete_header,
+ );
+
+ self.incomplete_headers.insert(new_incomplete_header, None);
+ }
+
+ // for all headers that were incompleted previously, but now are completed, we move
+ // all descendants from incomplete to ready
+ let just_completed_headers = self
+ .incomplete_headers
+ .keys()
+ .chain(self.completion_data.keys())
+ .filter(|id| !ids.contains(id))
+ .cloned()
+ .collect:: (
+ &mut [&mut self.incomplete],
+ &mut self.ready,
+ &mut self.known_headers,
+ HeaderStatus::Ready,
+ &just_completed_header,
+ );
+
+ log::debug!(
+ target: "bridge",
+ "Completion data is no longer required for header: {:?}",
+ just_completed_header,
+ );
+
+ self.incomplete_headers.remove(&just_completed_header);
+ self.completion_data.remove(&just_completed_header);
+ }
+ }
+
+ /// Returns id of the header for which we want to fetch completion data.
+ pub fn incomplete_header(&mut self) -> Option {
prune_queue(&mut self.extra, prune_border);
prune_queue(&mut self.ready, prune_border);
prune_queue(&mut self.submitted, prune_border);
+ prune_queue(&mut self.incomplete, prune_border);
prune_known_headers:: (&mut self.known_headers, prune_border);
self.prune_border = prune_border;
}
@@ -379,10 +484,81 @@ impl {
self.maybe_extra.clear();
self.extra.clear();
self.ready.clear();
+ self.incomplete.clear();
self.submitted.clear();
self.known_headers.clear();
self.prune_border = Zero::zero();
}
+
+ /// Returns true if parent of this header is either incomplete or waiting for
+ /// its own incomplete ancestor to be completed.
+ fn is_parent_incomplete(&self, id: &HeaderId (&mut self.known_headers, ¤t, HeaderStatus::Synced);
+
+ current = header.parent_id();
+ id_processed = true;
+ }
+
+ // remember that the header itself is synced
+ // (condition is here to avoid duplicate log messages)
+ if !id_processed {
+ set_header_status:: (&mut self.known_headers, &id, HeaderStatus::Synced);
+ }
+
+ // now let's move all descendants from maybe_orphan && orphan queues to
+ // maybe_extra queue
+ move_header_descendants:: (
+ &mut [&mut self.maybe_orphan, &mut self.orphan],
+ &mut self.maybe_extra,
+ &mut self.known_headers,
+ HeaderStatus::MaybeExtra,
+ id,
+ );
+ }
}
/// Insert header to the queue.
@@ -411,6 +587,14 @@ fn remove_header ,
+ id: &HeaderId > {
+ queue.get(&id.0).and_then(|by_hash| by_hash.get(&id.1))
+}
+
/// Move header from source to destination queue.
///
/// Returns ID of parent header, if header has been moved, or None otherwise.
@@ -428,16 +612,8 @@ fn move_header (known_headers, id, destination_status);
Some(parent_id)
}
@@ -473,19 +649,8 @@ fn move_header_descendants (known_headers, &header_to_move_id, destination_status);
}
}
@@ -544,6 +709,44 @@ fn prune_known_headers ,
+ id: &HeaderId