Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to jsonrpsee 0.7 and impl Stream on TransactionProgress #380

Merged
merged 3 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ jobs:
with:
# Use this issue template:
filename: .github/issue_templates/nightly_run_failed.md
# Don't create a new issue; skip updating existing:
update_existing: false
# Update existing issue if found; hopefully will make it clearer
# that it is still an issue:
update_existing: true
# Look for new *open* issues in this search (we want to
# create a new one if we only find closed versions):
search_existing: open
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ chameleon = "0.1.0"
scale-info = { version = "1.0.0", features = ["bit-vec"] }
futures = "0.3.13"
hex = "0.4.3"
jsonrpsee = { version = "0.5.1", features = ["macros", "ws-client", "http-client"] }
jsonrpsee = { version = "0.7.0", features = ["macros", "ws-client", "http-client"] }
log = "0.4.14"
num-traits = { version = "0.2.14", default-features = false }
serde = { version = "1.0.124", features = ["derive"] }
Expand Down
4 changes: 3 additions & 1 deletion examples/submit_and_watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
//! polkadot --dev --tmp
//! ```

use futures::StreamExt;
use sp_keyring::AccountKeyring;
use subxt::{
ClientBuilder,
Expand Down Expand Up @@ -144,7 +145,8 @@ async fn handle_transfer_events() -> Result<(), Box<dyn std::error::Error>> {
.sign_and_submit_then_watch(&signer)
.await?;

while let Some(ev) = balance_transfer_progress.next().await? {
while let Some(ev) = balance_transfer_progress.next().await {
let ev = ev?;
use subxt::TransactionStatus::*;

// Made it into a block, but not finalized.
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
},
Metadata,
};
use jsonrpsee::types::Error as RequestError;
use jsonrpsee::core::error::Error as RequestError;
use sp_core::crypto::SecretStringError;
use sp_runtime::{
transaction_validity::TransactionValidityError,
Expand Down
32 changes: 15 additions & 17 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,23 @@ use core::{
};
use frame_metadata::RuntimeMetadataPrefixed;
use jsonrpsee::{
http_client::{
HttpClient,
HttpClientBuilder,
},
types::{
to_json_value,
traits::{
core::{
client::{
Client,
SubscriptionClient,
ClientT,
Subscription,
SubscriptionClientT,
},
to_json_value,
DeserializeOwned,
Error as RpcError,
JsonValue,
Subscription,
},
ws_client::{
WsClient,
WsClientBuilder,
http_client::{
HttpClient,
HttpClientBuilder,
},
ws_client::WsClientBuilder,
};
use serde::{
Deserialize,
Expand Down Expand Up @@ -172,7 +170,7 @@ pub enum SubstrateTransactionStatus<Hash, BlockHash> {
#[derive(Clone)]
pub enum RpcClient {
/// JSONRPC client WebSocket transport.
WebSocket(Arc<WsClient>),
WebSocket(Arc<Client>),
/// JSONRPC client HTTP transport.
// NOTE: Arc because `HttpClient` is not clone.
Http(Arc<HttpClient>),
Expand Down Expand Up @@ -239,14 +237,14 @@ impl RpcClient {
}
}

impl From<WsClient> for RpcClient {
fn from(client: WsClient) -> Self {
impl From<Client> for RpcClient {
fn from(client: Client) -> Self {
RpcClient::WebSocket(Arc::new(client))
}
}

impl From<Arc<WsClient>> for RpcClient {
fn from(client: Arc<WsClient>) -> Self {
impl From<Arc<Client>> for RpcClient {
fn from(client: Arc<Client>) -> Self {
RpcClient::WebSocket(client)
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with subxt. If not, see <http://www.gnu.org/licenses/>.

use jsonrpsee::types::{
use jsonrpsee::core::{
client::Subscription,
DeserializeOwned,
Subscription,
};
use sp_core::{
storage::{
Expand Down Expand Up @@ -247,12 +247,12 @@ where
T: DeserializeOwned,
{
match sub.next().await {
Ok(Some(next)) => Some(next),
Ok(None) => None,
Err(e) => {
Some(Ok(next)) => Some(next),
Some(Err(e)) => {
log::error!("Subscription {} failed: {:?} dropping", sub_name, e);
None
}
None => None,
}
}

Expand Down
161 changes: 93 additions & 68 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with subxt. If not, see <http://www.gnu.org/licenses/>.

use std::task::Poll;

use sp_core::storage::StorageKey;
use sp_runtime::traits::Hash;
pub use sp_runtime::traits::SignedExtension;
Expand All @@ -30,9 +32,13 @@ use crate::{
Config,
Phase,
};
use jsonrpsee::types::{
use futures::{
Stream,
StreamExt,
};
use jsonrpsee::core::{
client::Subscription as RpcSubscription,
Error as RpcError,
Subscription as RpcSubscription,
};

/// This struct represents a subscription to the progress of some transaction, and is
Expand All @@ -44,6 +50,11 @@ pub struct TransactionProgress<'client, T: Config> {
client: &'client Client<T>,
}

// The above type is not `Unpin` by default unless the generic param `T` is,
// so we manually make it clear that Unpin is actually fine regardless of `T`
// (we don't care if this moves around in memory while it's "pinned").
impl<'client, T: Config> Unpin for TransactionProgress<'client, T> {}

impl<'client, T: Config> TransactionProgress<'client, T> {
pub(crate) fn new(
sub: RpcSubscription<SubstrateTransactionStatus<T::Hash, T::Hash>>,
Expand All @@ -57,79 +68,31 @@ impl<'client, T: Config> TransactionProgress<'client, T> {
}
}

/// Return the next transaction status when it's emitted.
pub async fn next(&mut self) -> Result<Option<TransactionStatus<'client, T>>, Error> {
// Return `None` if the subscription has been dropped:
let sub = match &mut self.sub {
Some(sub) => sub,
None => return Ok(None),
};

// Return the next item otherwise:
let res = sub.next().await?;
Ok(res.map(|status| {
match status {
SubstrateTransactionStatus::Future => TransactionStatus::Future,
SubstrateTransactionStatus::Ready => TransactionStatus::Ready,
SubstrateTransactionStatus::Broadcast(peers) => {
TransactionStatus::Broadcast(peers)
}
SubstrateTransactionStatus::InBlock(hash) => {
TransactionStatus::InBlock(TransactionInBlock {
block_hash: hash,
ext_hash: self.ext_hash,
client: self.client,
})
}
SubstrateTransactionStatus::Retracted(hash) => {
TransactionStatus::Retracted(hash)
}
SubstrateTransactionStatus::Usurped(hash) => {
TransactionStatus::Usurped(hash)
}
SubstrateTransactionStatus::Dropped => TransactionStatus::Dropped,
SubstrateTransactionStatus::Invalid => TransactionStatus::Invalid,
// Only the following statuses are actually considered "final" (see the substrate
// docs on `TransactionStatus`). Basically, either the transaction makes it into a
// block, or we eventually give up on waiting for it to make it into a block.
// Even `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually.
//
// As an example, a transaction that is `Invalid` on one node due to having the wrong
// nonce might still be valid on some fork on another node which ends up being finalized.
// Equally, a transaction `Dropped` from one node may still be in the transaction pool,
// and make it into a block, on another node. Likewise with `Usurped`.
SubstrateTransactionStatus::FinalityTimeout(hash) => {
self.sub = None;
TransactionStatus::FinalityTimeout(hash)
}
SubstrateTransactionStatus::Finalized(hash) => {
self.sub = None;
TransactionStatus::Finalized(TransactionInBlock {
block_hash: hash,
ext_hash: self.ext_hash,
client: self.client,
})
}
}
}))
/// Return the next transaction status when it's emitted. This just delegates to the
/// [`futures::Stream`] implementation for [`TransactionProgress`], but allows you to
/// avoid importing that trait if you don't otherwise need it.
pub async fn next_item(
&mut self,
) -> Option<Result<TransactionStatus<'client, T>, Error>> {
self.next().await
}

/// Wait for the transaction to be in a block (but not necessarily finalized), and return
/// an [`TransactionInBlock`] instance when this happens, or an error if there was a problem
/// waiting for this to happen.
///
/// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the
/// transaction progresses, use [`TransactionProgress::next()`] instead.
/// transaction progresses, use [`TransactionProgress::next_item()`] instead.
///
/// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they
/// may well indicate with some probability that the transaction will not make it into a block,
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower
/// level [`TransactionProgress::next()`] API if you'd like to handle these statuses yourself.
/// level [`TransactionProgress::next_item()`] API if you'd like to handle these statuses yourself.
pub async fn wait_for_in_block(
mut self,
) -> Result<TransactionInBlock<'client, T>, Error> {
while let Some(status) = self.next().await? {
match status {
while let Some(status) = self.next_item().await {
match status? {
// Finalized or otherwise in a block! Return.
TransactionStatus::InBlock(s) | TransactionStatus::Finalized(s) => {
return Ok(s)
Expand All @@ -149,17 +112,17 @@ impl<'client, T: Config> TransactionProgress<'client, T> {
/// instance when it is, or an error if there was a problem waiting for finalization.
///
/// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the
/// transaction progresses, use [`TransactionProgress::next()`] instead.
/// transaction progresses, use [`TransactionProgress::next_item()`] instead.
///
/// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they
/// may well indicate with some probability that the transaction will not make it into a block,
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower
/// level [`TransactionProgress::next()`] API if you'd like to handle these statuses yourself.
/// level [`TransactionProgress::next_item()`] API if you'd like to handle these statuses yourself.
pub async fn wait_for_finalized(
mut self,
) -> Result<TransactionInBlock<'client, T>, Error> {
while let Some(status) = self.next().await? {
match status {
while let Some(status) = self.next_item().await {
match status? {
// Finalized! Return.
TransactionStatus::Finalized(s) => return Ok(s),
// Error scenarios; return the error.
Expand All @@ -178,24 +141,86 @@ impl<'client, T: Config> TransactionProgress<'client, T> {
/// as well as a couple of other details (block hash and extrinsic hash).
///
/// **Note:** consumes self. If you'd like to perform multiple actions as progress is made,
/// use [`TransactionProgress::next()`] instead.
/// use [`TransactionProgress::next_item()`] instead.
///
/// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they
/// may well indicate with some probability that the transaction will not make it into a block,
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower
/// level [`TransactionProgress::next()`] API if you'd like to handle these statuses yourself.
/// level [`TransactionProgress::next_item()`] API if you'd like to handle these statuses yourself.
pub async fn wait_for_finalized_success(self) -> Result<TransactionEvents<T>, Error> {
let evs = self.wait_for_finalized().await?.wait_for_success().await?;
Ok(evs)
}
}

impl<'client, T: Config> Stream for TransactionProgress<'client, T> {
type Item = Result<TransactionStatus<'client, T>, Error>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let sub = match self.sub.as_mut() {
Some(sub) => sub,
None => return Poll::Ready(None),
};

sub.poll_next_unpin(cx)
.map_err(|e| e.into())
.map_ok(|status| {
match status {
SubstrateTransactionStatus::Future => TransactionStatus::Future,
SubstrateTransactionStatus::Ready => TransactionStatus::Ready,
SubstrateTransactionStatus::Broadcast(peers) => {
TransactionStatus::Broadcast(peers)
}
SubstrateTransactionStatus::InBlock(hash) => {
TransactionStatus::InBlock(TransactionInBlock {
block_hash: hash,
ext_hash: self.ext_hash,
client: self.client,
})
}
SubstrateTransactionStatus::Retracted(hash) => {
TransactionStatus::Retracted(hash)
}
SubstrateTransactionStatus::Usurped(hash) => {
TransactionStatus::Usurped(hash)
}
SubstrateTransactionStatus::Dropped => TransactionStatus::Dropped,
SubstrateTransactionStatus::Invalid => TransactionStatus::Invalid,
// Only the following statuses are actually considered "final" (see the substrate
// docs on `TransactionStatus`). Basically, either the transaction makes it into a
// block, or we eventually give up on waiting for it to make it into a block.
// Even `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually.
//
// As an example, a transaction that is `Invalid` on one node due to having the wrong
// nonce might still be valid on some fork on another node which ends up being finalized.
// Equally, a transaction `Dropped` from one node may still be in the transaction pool,
// and make it into a block, on another node. Likewise with `Usurped`.
SubstrateTransactionStatus::FinalityTimeout(hash) => {
self.sub = None;
TransactionStatus::FinalityTimeout(hash)
}
SubstrateTransactionStatus::Finalized(hash) => {
self.sub = None;
TransactionStatus::Finalized(TransactionInBlock {
block_hash: hash,
ext_hash: self.ext_hash,
client: self.client,
})
}
}
})
}
}

//* Dev note: The below is adapted from the substrate docs on `TransactionStatus`, which this
//* enum was adapted from (and which is an exact copy of `SubstrateTransactionStatus` in this crate).
//* Note that the number of finality watchers is, at the time of writing, found in the constant
//* `MAX_FINALITY_WATCHERS` in the `sc_transaction_pool` crate.
//*
/// Possible transaction statuses returned from our [`TransactionProgress::next()`] call.
/// Possible transaction statuses returned from our [`TransactionProgress::next_item()`] call.
///
/// These status events can be grouped based on their kinds as:
///
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ async fn chain_subscribe_blocks() {
let node_process = test_node_process().await;
let client = node_process.client();
let mut blocks = client.rpc().subscribe_blocks().await.unwrap();
blocks.next().await.unwrap();
blocks.next().await.unwrap().unwrap();
}

#[async_std::test]
async fn chain_subscribe_finalized_blocks() {
let node_process = test_node_process().await;
let client = node_process.client();
let mut blocks = client.rpc().subscribe_finalized_blocks().await.unwrap();
blocks.next().await.unwrap();
blocks.next().await.unwrap().unwrap();
}

#[async_std::test]
Expand Down