Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wait_for_finalized behavior if the tx dropped, usurped or invalid #897

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions subxt/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ pub enum TransactionError {
/// This is probably because the block was retracted before being finalized.
#[error("The block containing the transaction can no longer be found (perhaps it was on a non-finalized fork?)")]
BlockNotFound,
/// The transaction was deemed invalid in the current chain state.
#[error("The transaction is no longer valid")]
Invalid,
/// The transaction was replaced by a transaction with the same (sender, nonce) pair but with higher priority
#[error("The transaction was replaced by a transaction with the same (sender, nonce) pair but with higher priority.")]
Usurped,
/// The transaction was dropped because of some limit
#[error("The transaction was dropped from the pool because of a limit.")]
Dropped,
}

/// Something went wrong trying to encode a storage address.
Expand Down
3 changes: 2 additions & 1 deletion subxt/src/rpc/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ impl<Res> std::fmt::Debug for Subscription<Res> {
}

impl<Res> Subscription<Res> {
fn new(inner: RpcSubscription) -> Self {
/// Creates a new [`Subscription<Res>`].
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this type param in doc links is needless and could be simplified?!

Suggested change
/// Creates a new [`Subscription<Res>`].
/// Creates a new [`Subscription`].

pub fn new(inner: RpcSubscription) -> Self {
Self {
inner,
_marker: std::marker::PhantomData,
Expand Down
189 changes: 166 additions & 23 deletions subxt/src/tx/tx_progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ where
/// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the
/// transaction progresses, use [`TxProgress::next_item()`] instead.
///
/// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they
/// may well indicate with some probability that the transaction will not make it into a block,
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower
/// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself.
/// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some
/// probability that the transaction will not make it into a block but there is no guarantee
/// that this is true. In those cases the stream is closed however, so you currently have no
/// way to find out if they finally made it into a block or not.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally, we should have an integration test against substrate if this behavior gets changed but should be fine for now and the old RPC API will be replaced "soon" :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to elaborate the docs in substrate on the transaction status says one thing and the RPC implementation does something else :P

pub async fn wait_for_in_block(mut self) -> Result<TxInBlock<T, C>, Error> {
while let Some(status) = self.next_item().await {
match status? {
Expand All @@ -82,6 +82,9 @@ where
TxStatus::FinalityTimeout(_) => {
return Err(TransactionError::FinalitySubscriptionTimeout.into())
}
TxStatus::Invalid => return Err(TransactionError::Invalid.into()),
TxStatus::Usurped(_) => return Err(TransactionError::Usurped.into()),
TxStatus::Dropped => return Err(TransactionError::Dropped.into()),
// Ignore anything else and wait for next status event:
_ => continue,
}
Expand All @@ -95,10 +98,10 @@ where
/// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the
/// transaction progresses, use [`TxProgress::next_item()`] instead.
///
/// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they
/// may well indicate with some probability that the transaction will not make it into a block,
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower
/// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself.
/// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some
/// probability that the transaction will not make it into a block but there is no guarantee
/// that this is true. In those cases the stream is closed however, so you currently have no
/// way to find out if they finally made it into a block or not.
Copy link
Member

@niklasad1 niklasad1 Apr 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's possible to create another subscription and watch the extrinsic again if any of the transaction statuses wasn't finalized.

For example

let xt = create_xt()
submit(xt.clone).await?;
let mut watch_ext = rpc.watch_extrinsic(xt).await?;
loop {
   match watch_xt.next().await {
       TxStatus::Finalized(_) => break Ok(()),
       other => {
           log::warning!("Watch extrinsic {xt} failed: {other}, retrying again);
           // create a new subscription
           watch_ext = rpc.watch_extrinsic(xt).await?; 
       }
   }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for sure complicated but possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay, I did not know this, should I add your code example to the doc comment?

Copy link
Member

@niklasad1 niklasad1 Apr 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That code doesn't really work for the TxProgress as it wraps subscription and extrinsic.
So you can just modify your comment In those cases the stream is closed however, so you currently have no way to find out to something like In those cases you have to re-subscribe to the extrinsic/create a new TxProgress repeatedly until the extrinsic is finalized

I'm just picking hair here but this is tricky to understand for sure

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether we should provide an API for this, what do you guys think?

Copy link
Collaborator

@jsdw jsdw Apr 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh hold on; I didn't think that it was possible to "re subscribe" to a transaction again!

This would definitely be a good thing to try out, and I think a nice API to do that (maybe some function on this TxProgress thing) would be very cool.

Lemme write an issue and we can try it out!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether we should provide an API for this, what do you guys think?

You mentioned above that "the old RPC API will be replaced soon", so does that mean that also our stream handling will change? If that is the case, will the logic of this API need to change? If not it is certainly a good Idea to have an API for resubscribing right now, something like tx_progress.wait_until_finalized_until_timeout() that does the resubscribing under the hood.

pub async fn wait_for_finalized(mut self) -> Result<TxInBlock<T, C>, Error> {
while let Some(status) = self.next_item().await {
match status? {
Expand All @@ -108,6 +111,9 @@ where
TxStatus::FinalityTimeout(_) => {
return Err(TransactionError::FinalitySubscriptionTimeout.into())
}
TxStatus::Invalid => return Err(TransactionError::Invalid.into()),
TxStatus::Usurped(_) => return Err(TransactionError::Usurped.into()),
TxStatus::Dropped => return Err(TransactionError::Dropped.into()),
// Ignore and wait for next status event:
_ => continue,
}
Expand All @@ -122,10 +128,10 @@ where
/// **Note:** consumes self. If you'd like to perform multiple actions as progress is made,
/// use [`TxProgress::next_item()`] instead.
///
/// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they
/// may well indicate with some probability that the transaction will not make it into a block,
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower
/// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself.
/// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some
/// probability that the transaction will not make it into a block but there is no guarantee
/// that this is true. In those cases the stream is closed however, so you currently have no
/// way to find out if they finally made it into a block or not.
pub async fn wait_for_finalized_success(
self,
) -> Result<crate::blocks::ExtrinsicEvents<T>, Error> {
Expand All @@ -134,7 +140,7 @@ where
}
}

impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> {
impl<T: Config, C: Clone> Stream for TxProgress<T, C> {
type Item = Result<TxStatus<T, C>, Error>;

fn poll_next(
Expand All @@ -155,13 +161,18 @@ impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> {
TxStatus::InBlock(TxInBlock::new(hash, self.ext_hash, self.client.clone()))
}
SubstrateTxStatus::Retracted(hash) => TxStatus::Retracted(hash),
SubstrateTxStatus::Usurped(hash) => TxStatus::Usurped(hash),
SubstrateTxStatus::Dropped => TxStatus::Dropped,
SubstrateTxStatus::Invalid => TxStatus::Invalid,
// Only the following statuses are actually considered "final" (see the substrate
// docs on `TxStatus`). Basically, either the transaction makes it into a
// block, or we eventually give up on waiting for it to make it into a block.
// Even `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually.
// Only the following statuses are considered "final", in a sense that they end the stream (see the substrate
// docs on `TxStatus`):
//
// - Usurped
// - Finalized
// - FinalityTimeout
// - Invalid
// - Dropped
//
// Even though `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually,
// the server considers them final and closes the connection, when they are encountered.
// curently there is no way of telling if that happens, because the server ends the stream before.
//
// As an example, a transaction that is `Invalid` on one node due to having the wrong
// nonce might still be valid on some fork on another node which ends up being finalized.
Expand All @@ -175,6 +186,18 @@ impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> {
self.sub = None;
TxStatus::Finalized(TxInBlock::new(hash, self.ext_hash, self.client.clone()))
}
SubstrateTxStatus::Usurped(hash) => {
self.sub = None;
TxStatus::Usurped(hash)
}
SubstrateTxStatus::Dropped => {
self.sub = None;
TxStatus::Dropped
}
SubstrateTxStatus::Invalid => {
self.sub = None;
TxStatus::Invalid
}
}
})
}
Expand Down Expand Up @@ -220,8 +243,16 @@ impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> {
/// pool about such cases).
/// 4. `Retracted` transactions might be included in a future block.
///
/// The stream is considered finished only when either the `Finalized` or `FinalityTimeout`
/// event is triggered. You are however free to unsubscribe from notifications at any point.
/// Even though these cases can happen, the server-side of the stream is closed, if one of the following is encountered:
/// - Usurped
/// - Finalized
/// - FinalityTimeout
/// - Invalid
/// - Dropped
/// In any of these cases the client side TxProgress stream is also closed.
/// So there is currently no way for you to tell if an Dropped`/`Invalid`/`Usurped` transaction
/// reappears in the pool again or not.
/// You are free to unsubscribe from notifications at any point.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd move this to the bottom fo the comment, in part because the rest of this comment is copied from the TxStatus struct in Substrate :)

/// The first one will be emitted when the block in which the transaction was included gets
/// finalized. The `FinalityTimeout` event will be emitted when the block did not reach finality
/// within 512 blocks. This either indicates that finality is not available for your chain,
Expand Down Expand Up @@ -284,7 +315,7 @@ pub struct TxInBlock<T: Config, C> {
client: C,
}

impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> {
impl<T: Config, C> TxInBlock<T, C> {
pub(crate) fn new(block_hash: T::Hash, ext_hash: T::Hash, client: C) -> Self {
Self {
block_hash,
Expand All @@ -302,7 +333,9 @@ impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> {
pub fn extrinsic_hash(&self) -> T::Hash {
self.ext_hash
}
}

impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> {
/// Fetch the events associated with this transaction. If the transaction
/// was successful (ie no `ExtrinsicFailed`) events were found, then we return
/// the events associated with it. If the transaction was not successful, or
Expand Down Expand Up @@ -370,3 +403,113 @@ impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> {
))
}
}

#[cfg(test)]
mod test {
use std::pin::Pin;

use futures::Stream;
use primitive_types::H256;
use serde::Serialize;

use crate::{
client::{OfflineClientT, OnlineClientT},
config::polkadot::PolkadotConfig,
error::RpcError,
rpc::{types::SubstrateTxStatus, RpcSubscription, Subscription},
tx::TxProgress,
Error,
};

use serde_json::value::RawValue;

#[derive(Clone, Debug)]
struct MockClient;

impl OfflineClientT<PolkadotConfig> for MockClient {
fn metadata(&self) -> crate::Metadata {
panic!("just a mock impl to satisfy trait bounds")
}

fn genesis_hash(&self) -> <PolkadotConfig as crate::Config>::Hash {
panic!("just a mock impl to satisfy trait bounds")
}

fn runtime_version(&self) -> crate::rpc::types::RuntimeVersion {
panic!("just a mock impl to satisfy trait bounds")
}
}

impl OnlineClientT<PolkadotConfig> for MockClient {
fn rpc(&self) -> &crate::rpc::Rpc<PolkadotConfig> {
panic!("just a mock impl to satisfy trait bounds")
}
}

#[tokio::test]
async fn wait_for_finalized_returns_err_when_usurped() {
let c = MockClient;
let stream_elements: Vec<SubstrateTxStatus<H256, H256>> = vec![
SubstrateTxStatus::Ready,
SubstrateTxStatus::Usurped(Default::default()),
];
let sub = create_substrate_tx_status_subscription(stream_elements);
let tx_progress: TxProgress<PolkadotConfig, MockClient> =
TxProgress::new(sub, c, Default::default());
let finalized_result = tx_progress.wait_for_finalized().await;
assert!(matches!(
finalized_result,
Err(Error::Transaction(crate::error::TransactionError::Usurped))
));
}

#[tokio::test]
async fn wait_for_finalized_returns_err_when_dropped() {
let c = MockClient;
let stream_elements: Vec<SubstrateTxStatus<H256, H256>> =
vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Dropped];
let sub = create_substrate_tx_status_subscription(stream_elements);
let tx_progress: TxProgress<PolkadotConfig, MockClient> =
TxProgress::new(sub, c, Default::default());
Copy link
Collaborator

@jsdw jsdw Apr 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I think I'd wrap this duplicated stuff into some function with a signature like:

fn mock_tx_progress(statuses: Vec<SubstrateTxStatus>) -> TxProgress

Then it'll be really easy to see the logic that matters in each test because they will just look like:

let tx_progress = mock_tx_progress(vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Dropped]);
let finalized_result = tx_progress.wait_for_finalized().await;
assert!(matches!(
    finalized_result,
    Err(Error::Transaction(crate::error::TransactionError::Dropped))
));

let finalized_result = tx_progress.wait_for_finalized().await;
assert!(matches!(
finalized_result,
Err(Error::Transaction(crate::error::TransactionError::Dropped))
));
}

#[tokio::test]
async fn wait_for_finalized_returns_err_when_invalid() {
let c = MockClient;
let stream_elements: Vec<SubstrateTxStatus<H256, H256>> =
vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Invalid];
let sub = create_substrate_tx_status_subscription(stream_elements);
let tx_progress: TxProgress<PolkadotConfig, MockClient> =
TxProgress::new(sub, c, Default::default());
let finalized_result = tx_progress.wait_for_finalized().await;
assert!(matches!(
finalized_result,
Err(Error::Transaction(crate::error::TransactionError::Invalid))
));
}

fn create_substrate_tx_status_subscription<Hash: Send + 'static + Serialize>(
elements: Vec<SubstrateTxStatus<Hash, Hash>>,
) -> Subscription<SubstrateTxStatus<Hash, Hash>> {
let rpc_substription_stream: Pin<
Box<dyn Stream<Item = Result<Box<RawValue>, RpcError>> + Send + 'static>,
> = Box::pin(futures::stream::iter(elements.into_iter().map(|e| {
let s = serde_json::to_string(&e).unwrap();
let r: Box<RawValue> = RawValue::from_string(s).unwrap();
Ok(r)
})));

let rpc_subscription: RpcSubscription = RpcSubscription {
stream: rpc_substription_stream,
id: None,
};

let sub: Subscription<SubstrateTxStatus<Hash, Hash>> = Subscription::new(rpc_subscription);
sub
}
}