-
Notifications
You must be signed in to change notification settings - Fork 261
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
wait_for_finalized behavior if the tx dropped, usurped or invalid #897
Changes from 4 commits
1007819
f953408
5b69c6d
c26532f
5338fc6
82f4957
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,10 +69,10 @@ where | |
/// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the | ||
/// transaction progresses, use [`TxProgress::next_item()`] instead. | ||
/// | ||
/// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they | ||
/// may well indicate with some probability that the transaction will not make it into a block, | ||
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower | ||
/// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself. | ||
/// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some | ||
/// probability that the transaction will not make it into a block but there is no guarantee | ||
/// that this is true. In those cases the stream is closed however, so you currently have no | ||
/// way to find out if they finally made it into a block or not. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ideally, we should have an integration test against substrate if this behavior gets changed but should be fine for now and the old RPC API will be replaced "soon" :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to elaborate the docs in substrate on the transaction status says one thing and the RPC implementation does something else :P |
||
pub async fn wait_for_in_block(mut self) -> Result<TxInBlock<T, C>, Error> { | ||
while let Some(status) = self.next_item().await { | ||
match status? { | ||
|
@@ -82,6 +82,9 @@ where | |
TxStatus::FinalityTimeout(_) => { | ||
return Err(TransactionError::FinalitySubscriptionTimeout.into()) | ||
} | ||
TxStatus::Invalid => return Err(TransactionError::Invalid.into()), | ||
TxStatus::Usurped(_) => return Err(TransactionError::Usurped.into()), | ||
TxStatus::Dropped => return Err(TransactionError::Dropped.into()), | ||
// Ignore anything else and wait for next status event: | ||
_ => continue, | ||
} | ||
|
@@ -95,10 +98,10 @@ where | |
/// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the | ||
/// transaction progresses, use [`TxProgress::next_item()`] instead. | ||
/// | ||
/// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they | ||
/// may well indicate with some probability that the transaction will not make it into a block, | ||
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower | ||
/// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself. | ||
/// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some | ||
/// probability that the transaction will not make it into a block but there is no guarantee | ||
/// that this is true. In those cases the stream is closed however, so you currently have no | ||
/// way to find out if they finally made it into a block or not. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's possible to create another subscription and watch the extrinsic again if any of the transaction statuses wasn't finalized. For example let xt = create_xt()
submit(xt.clone).await?;
let mut watch_ext = rpc.watch_extrinsic(xt).await?;
loop {
match watch_xt.next().await {
TxStatus::Finalized(_) => break Ok(()),
other => {
log::warning!("Watch extrinsic {xt} failed: {other}, retrying again);
// create a new subscription
watch_ext = rpc.watch_extrinsic(xt).await?;
}
}
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for sure complicated but possible There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah okay, I did not know this, should I add your code example to the doc comment? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That code doesn't really work for the I'm just picking hair here but this is tricky to understand for sure There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure whether we should provide an API for this, what do you guys think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ooh hold on; I didn't think that it was possible to "re subscribe" to a transaction again! This would definitely be a good thing to try out, and I think a nice API to do that (maybe some function on this Lemme write an issue and we can try it out! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
You mentioned above that "the old RPC API will be replaced soon", so does that mean that also our stream handling will change? If that is the case, will the logic of this API need to change? If not it is certainly a good Idea to have an API for resubscribing right now, something like |
||
pub async fn wait_for_finalized(mut self) -> Result<TxInBlock<T, C>, Error> { | ||
while let Some(status) = self.next_item().await { | ||
match status? { | ||
|
@@ -108,6 +111,9 @@ where | |
TxStatus::FinalityTimeout(_) => { | ||
return Err(TransactionError::FinalitySubscriptionTimeout.into()) | ||
niklasad1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
TxStatus::Invalid => return Err(TransactionError::Invalid.into()), | ||
TxStatus::Usurped(_) => return Err(TransactionError::Usurped.into()), | ||
TxStatus::Dropped => return Err(TransactionError::Dropped.into()), | ||
// Ignore and wait for next status event: | ||
_ => continue, | ||
} | ||
|
@@ -122,10 +128,10 @@ where | |
/// **Note:** consumes self. If you'd like to perform multiple actions as progress is made, | ||
/// use [`TxProgress::next_item()`] instead. | ||
/// | ||
/// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they | ||
/// may well indicate with some probability that the transaction will not make it into a block, | ||
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower | ||
/// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself. | ||
/// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some | ||
/// probability that the transaction will not make it into a block but there is no guarantee | ||
/// that this is true. In those cases the stream is closed however, so you currently have no | ||
/// way to find out if they finally made it into a block or not. | ||
niklasad1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pub async fn wait_for_finalized_success( | ||
self, | ||
) -> Result<crate::blocks::ExtrinsicEvents<T>, Error> { | ||
|
@@ -134,7 +140,7 @@ where | |
} | ||
} | ||
|
||
impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> { | ||
impl<T: Config, C: Clone> Stream for TxProgress<T, C> { | ||
type Item = Result<TxStatus<T, C>, Error>; | ||
|
||
fn poll_next( | ||
|
@@ -155,13 +161,18 @@ impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> { | |
TxStatus::InBlock(TxInBlock::new(hash, self.ext_hash, self.client.clone())) | ||
} | ||
SubstrateTxStatus::Retracted(hash) => TxStatus::Retracted(hash), | ||
SubstrateTxStatus::Usurped(hash) => TxStatus::Usurped(hash), | ||
SubstrateTxStatus::Dropped => TxStatus::Dropped, | ||
SubstrateTxStatus::Invalid => TxStatus::Invalid, | ||
// Only the following statuses are actually considered "final" (see the substrate | ||
// docs on `TxStatus`). Basically, either the transaction makes it into a | ||
// block, or we eventually give up on waiting for it to make it into a block. | ||
// Even `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually. | ||
// Only the following statuses are considered "final", in a sense that they end the stream (see the substrate | ||
// docs on `TxStatus`): | ||
// | ||
// - Usurped | ||
// - Finalized | ||
// - FinalityTimeout | ||
// - Invalid | ||
// - Dropped | ||
// | ||
// Even though `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually, | ||
// the server considers them final and closes the connection, when they are encountered. | ||
// curently there is no way of telling if that happens, because the server ends the stream before. | ||
// | ||
// As an example, a transaction that is `Invalid` on one node due to having the wrong | ||
// nonce might still be valid on some fork on another node which ends up being finalized. | ||
|
@@ -175,6 +186,18 @@ impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> { | |
self.sub = None; | ||
TxStatus::Finalized(TxInBlock::new(hash, self.ext_hash, self.client.clone())) | ||
} | ||
SubstrateTxStatus::Usurped(hash) => { | ||
self.sub = None; | ||
TxStatus::Usurped(hash) | ||
} | ||
SubstrateTxStatus::Dropped => { | ||
self.sub = None; | ||
TxStatus::Dropped | ||
} | ||
SubstrateTxStatus::Invalid => { | ||
self.sub = None; | ||
TxStatus::Invalid | ||
} | ||
} | ||
}) | ||
} | ||
|
@@ -220,8 +243,16 @@ impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> { | |
/// pool about such cases). | ||
/// 4. `Retracted` transactions might be included in a future block. | ||
/// | ||
/// The stream is considered finished only when either the `Finalized` or `FinalityTimeout` | ||
/// event is triggered. You are however free to unsubscribe from notifications at any point. | ||
/// Even though these cases can happen, the server-side of the stream is closed, if one of the following is encountered: | ||
/// - Usurped | ||
/// - Finalized | ||
/// - FinalityTimeout | ||
/// - Invalid | ||
/// - Dropped | ||
/// In any of these cases the client side TxProgress stream is also closed. | ||
/// So there is currently no way for you to tell if an Dropped`/`Invalid`/`Usurped` transaction | ||
/// reappears in the pool again or not. | ||
/// You are free to unsubscribe from notifications at any point. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I'd move this to the bottom fo the comment, in part because the rest of this comment is copied from the TxStatus struct in Substrate :) |
||
/// The first one will be emitted when the block in which the transaction was included gets | ||
/// finalized. The `FinalityTimeout` event will be emitted when the block did not reach finality | ||
/// within 512 blocks. This either indicates that finality is not available for your chain, | ||
|
@@ -284,7 +315,7 @@ pub struct TxInBlock<T: Config, C> { | |
client: C, | ||
} | ||
|
||
impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> { | ||
impl<T: Config, C> TxInBlock<T, C> { | ||
pub(crate) fn new(block_hash: T::Hash, ext_hash: T::Hash, client: C) -> Self { | ||
Self { | ||
block_hash, | ||
|
@@ -302,7 +333,9 @@ impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> { | |
pub fn extrinsic_hash(&self) -> T::Hash { | ||
self.ext_hash | ||
} | ||
} | ||
|
||
impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> { | ||
/// Fetch the events associated with this transaction. If the transaction | ||
/// was successful (ie no `ExtrinsicFailed`) events were found, then we return | ||
/// the events associated with it. If the transaction was not successful, or | ||
|
@@ -370,3 +403,113 @@ impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> { | |
)) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod test { | ||
use std::pin::Pin; | ||
|
||
use futures::Stream; | ||
use primitive_types::H256; | ||
use serde::Serialize; | ||
|
||
use crate::{ | ||
client::{OfflineClientT, OnlineClientT}, | ||
config::polkadot::PolkadotConfig, | ||
error::RpcError, | ||
rpc::{types::SubstrateTxStatus, RpcSubscription, Subscription}, | ||
tx::TxProgress, | ||
Error, | ||
}; | ||
|
||
use serde_json::value::RawValue; | ||
|
||
#[derive(Clone, Debug)] | ||
struct MockClient; | ||
|
||
impl OfflineClientT<PolkadotConfig> for MockClient { | ||
fn metadata(&self) -> crate::Metadata { | ||
panic!("just a mock impl to satisfy trait bounds") | ||
} | ||
|
||
fn genesis_hash(&self) -> <PolkadotConfig as crate::Config>::Hash { | ||
panic!("just a mock impl to satisfy trait bounds") | ||
} | ||
|
||
fn runtime_version(&self) -> crate::rpc::types::RuntimeVersion { | ||
panic!("just a mock impl to satisfy trait bounds") | ||
} | ||
} | ||
|
||
impl OnlineClientT<PolkadotConfig> for MockClient { | ||
fn rpc(&self) -> &crate::rpc::Rpc<PolkadotConfig> { | ||
panic!("just a mock impl to satisfy trait bounds") | ||
} | ||
} | ||
|
||
#[tokio::test] | ||
async fn wait_for_finalized_returns_err_when_usurped() { | ||
let c = MockClient; | ||
let stream_elements: Vec<SubstrateTxStatus<H256, H256>> = vec![ | ||
niklasad1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
SubstrateTxStatus::Ready, | ||
SubstrateTxStatus::Usurped(Default::default()), | ||
]; | ||
let sub = create_substrate_tx_status_subscription(stream_elements); | ||
let tx_progress: TxProgress<PolkadotConfig, MockClient> = | ||
TxProgress::new(sub, c, Default::default()); | ||
let finalized_result = tx_progress.wait_for_finalized().await; | ||
assert!(matches!( | ||
finalized_result, | ||
Err(Error::Transaction(crate::error::TransactionError::Usurped)) | ||
)); | ||
} | ||
|
||
#[tokio::test] | ||
async fn wait_for_finalized_returns_err_when_dropped() { | ||
let c = MockClient; | ||
let stream_elements: Vec<SubstrateTxStatus<H256, H256>> = | ||
vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Dropped]; | ||
let sub = create_substrate_tx_status_subscription(stream_elements); | ||
let tx_progress: TxProgress<PolkadotConfig, MockClient> = | ||
TxProgress::new(sub, c, Default::default()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Personally I think I'd wrap this duplicated stuff into some function with a signature like: fn mock_tx_progress(statuses: Vec<SubstrateTxStatus>) -> TxProgress Then it'll be really easy to see the logic that matters in each test because they will just look like: let tx_progress = mock_tx_progress(vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Dropped]);
let finalized_result = tx_progress.wait_for_finalized().await;
assert!(matches!(
finalized_result,
Err(Error::Transaction(crate::error::TransactionError::Dropped))
)); |
||
let finalized_result = tx_progress.wait_for_finalized().await; | ||
assert!(matches!( | ||
finalized_result, | ||
Err(Error::Transaction(crate::error::TransactionError::Dropped)) | ||
)); | ||
} | ||
|
||
#[tokio::test] | ||
async fn wait_for_finalized_returns_err_when_invalid() { | ||
let c = MockClient; | ||
let stream_elements: Vec<SubstrateTxStatus<H256, H256>> = | ||
vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Invalid]; | ||
let sub = create_substrate_tx_status_subscription(stream_elements); | ||
let tx_progress: TxProgress<PolkadotConfig, MockClient> = | ||
TxProgress::new(sub, c, Default::default()); | ||
let finalized_result = tx_progress.wait_for_finalized().await; | ||
assert!(matches!( | ||
finalized_result, | ||
Err(Error::Transaction(crate::error::TransactionError::Invalid)) | ||
)); | ||
} | ||
|
||
fn create_substrate_tx_status_subscription<Hash: Send + 'static + Serialize>( | ||
elements: Vec<SubstrateTxStatus<Hash, Hash>>, | ||
) -> Subscription<SubstrateTxStatus<Hash, Hash>> { | ||
let rpc_substription_stream: Pin< | ||
Box<dyn Stream<Item = Result<Box<RawValue>, RpcError>> + Send + 'static>, | ||
> = Box::pin(futures::stream::iter(elements.into_iter().map(|e| { | ||
let s = serde_json::to_string(&e).unwrap(); | ||
let r: Box<RawValue> = RawValue::from_string(s).unwrap(); | ||
Ok(r) | ||
}))); | ||
|
||
let rpc_subscription: RpcSubscription = RpcSubscription { | ||
stream: rpc_substription_stream, | ||
id: None, | ||
}; | ||
|
||
let sub: Subscription<SubstrateTxStatus<Hash, Hash>> = Subscription::new(rpc_subscription); | ||
niklasad1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
sub | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this type param in
doc links
is needless and could be simplified?!