Skip to content

Commit

Permalink
[rpc-alt] use bigtable for transaction lookup (#21293)
Browse files Browse the repository at this point in the history
## Description 

Make transaction lookup use bigtable kvstore as well.

## Test plan 

Tested locally against bigtable.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] gRPC:
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
emmazzz authored Feb 21, 2025
1 parent 7c3c3fb commit 063d223
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 39 deletions.
63 changes: 25 additions & 38 deletions crates/sui-indexer-alt-jsonrpc/src/api/transactions/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use std::str::FromStr;
use anyhow::Context as _;
use futures::future::OptionFuture;
use move_core_types::annotated_value::{MoveDatatypeLayout, MoveTypeLayout};
use sui_indexer_alt_schema::transactions::{
BalanceChange, StoredTransaction, StoredTxBalanceChange,
};
use sui_indexer_alt_schema::transactions::{BalanceChange, StoredTxBalanceChange};
use sui_json_rpc_types::{
BalanceChange as SuiBalanceChange, ObjectChange as SuiObjectChange, SuiEvent,
SuiTransactionBlock, SuiTransactionBlockData, SuiTransactionBlockEffects,
Expand All @@ -29,7 +27,7 @@ use tokio::join;
use crate::{
context::Context,
data::{
objects::VersionedObjectKey, transactions::TransactionKey,
kv_loader::TransactionContents, objects::VersionedObjectKey,
tx_balance_changes::TxBalanceChangeKey,
},
error::{invalid_params, rpc_bail, RpcError},
Expand All @@ -44,15 +42,15 @@ pub(super) async fn transaction(
digest: TransactionDigest,
options: &SuiTransactionBlockResponseOptions,
) -> Result<SuiTransactionBlockResponse, RpcError<Error>> {
let stored_tx = ctx.pg_loader().load_one(TransactionKey(digest));
let tx = ctx.kv_loader().load_one_transaction(digest);
let stored_bc: OptionFuture<_> = options
.show_balance_changes
.then(|| ctx.pg_loader().load_one(TxBalanceChangeKey(digest)))
.into();

let (stored_tx, stored_bc) = join!(stored_tx, stored_bc);
let (tx, stored_bc) = join!(tx, stored_bc);

let stored_tx = stored_tx
let tx = tx
.context("Failed to fetch transaction from store")?
.ok_or_else(|| invalid_params(Error::NotFound(digest)))?;

Expand All @@ -67,37 +65,36 @@ pub(super) async fn transaction(
None => None,
};

let digest = TransactionDigest::try_from(stored_tx.tx_digest.clone())
.context("Failed to deserialize transaction digest")?;
let digest = tx.digest()?;

let mut response = SuiTransactionBlockResponse::new(digest);

if options.show_input {
response.transaction = Some(input(ctx, &stored_tx).await?);
response.transaction = Some(input(ctx, &tx).await?);
}

if options.show_raw_input {
response.raw_transaction = stored_tx.raw_transaction.clone();
response.raw_transaction = tx.raw_transaction()?;
}

if options.show_effects {
response.effects = Some(effects(&stored_tx)?);
response.effects = Some(effects(&tx)?);
}

if options.show_raw_effects {
response.raw_effects = stored_tx.raw_effects.clone();
response.raw_effects = tx.raw_effects()?;
}

if options.show_events {
response.events = Some(events(ctx, digest, &stored_tx).await?);
response.events = Some(events(ctx, digest, &tx).await?);
}

if let Some(changes) = stored_bc {
response.balance_changes = Some(balance_changes(changes)?);
}

if options.show_object_changes {
response.object_changes = Some(object_changes(ctx, digest, &stored_tx).await?);
response.object_changes = Some(object_changes(ctx, digest, &tx).await?);
}

Ok(response)
Expand All @@ -106,12 +103,10 @@ pub(super) async fn transaction(
/// Extract a representation of the transaction's input data from the stored form.
async fn input(
ctx: &Context,
tx: &StoredTransaction,
tx: &TransactionContents,
) -> Result<SuiTransactionBlock, RpcError<Error>> {
let data: TransactionData =
bcs::from_bytes(&tx.raw_transaction).context("Failed to deserialize TransactionData")?;
let tx_signatures: Vec<GenericSignature> =
bcs::from_bytes(&tx.user_signatures).context("Failed to deserialize user signatures")?;
let data: TransactionData = tx.data()?;
let tx_signatures: Vec<GenericSignature> = tx.signatures()?;

Ok(SuiTransactionBlock {
data: SuiTransactionBlockData::try_from_with_package_resolver(data, ctx.package_resolver())
Expand All @@ -122,9 +117,8 @@ async fn input(
}

/// Extract a representation of the transaction's effects from the stored form.
fn effects(tx: &StoredTransaction) -> Result<SuiTransactionBlockEffects, RpcError<Error>> {
let effects: TransactionEffects =
bcs::from_bytes(&tx.raw_effects).context("Failed to deserialize TransactionEffects")?;
fn effects(tx: &TransactionContents) -> Result<SuiTransactionBlockEffects, RpcError<Error>> {
let effects: TransactionEffects = tx.effects()?;
Ok(effects
.try_into()
.context("Failed to convert Effects into response")?)
Expand All @@ -134,9 +128,9 @@ fn effects(tx: &StoredTransaction) -> Result<SuiTransactionBlockEffects, RpcErro
async fn events(
ctx: &Context,
digest: TransactionDigest,
tx: &StoredTransaction,
tx: &TransactionContents,
) -> Result<SuiTransactionBlockEvents, RpcError<Error>> {
let events: Vec<Event> = bcs::from_bytes(&tx.events).context("Failed to deserialize Events")?;
let events: Vec<Event> = tx.events()?;
let mut sui_events = Vec::with_capacity(events.len());

for (ix, event) in events.into_iter().enumerate() {
Expand All @@ -158,14 +152,9 @@ async fn events(
),
};

let sui_event = SuiEvent::try_from(
event,
digest,
ix as u64,
Some(tx.timestamp_ms as u64),
layout,
)
.with_context(|| format!("Failed to convert Event {ix} into response"))?;
let sui_event =
SuiEvent::try_from(event, digest, ix as u64, Some(tx.timestamp_ms()), layout)
.with_context(|| format!("Failed to convert Event {ix} into response"))?;

sui_events.push(sui_event)
}
Expand Down Expand Up @@ -205,12 +194,10 @@ fn balance_changes(
async fn object_changes(
ctx: &Context,
digest: TransactionDigest,
tx: &StoredTransaction,
tx: &TransactionContents,
) -> Result<Vec<SuiObjectChange>, RpcError<Error>> {
let tx_data: TransactionData =
bcs::from_bytes(&tx.raw_transaction).context("Failed to deserialize TransactionData")?;
let effects: TransactionEffects =
bcs::from_bytes(&tx.raw_effects).context("Failed to deserialize TransactionEffects")?;
let tx_data: TransactionData = tx.data()?;
let effects: TransactionEffects = tx.effects()?;

let mut keys = vec![];
let native_changes = effects.object_changes();
Expand Down
117 changes: 117 additions & 0 deletions crates/sui-indexer-alt-jsonrpc/src/data/kv_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,21 @@
use super::error::Error;
use super::objects::VersionedObjectKey;
use super::pg_reader::PgReader;
use super::transactions::TransactionKey;
use super::{bigtable_reader::BigtableReader, checkpoints::CheckpointKey};
use async_graphql::dataloader::DataLoader;
use std::collections::HashMap;
use std::sync::Arc;
use sui_indexer_alt_schema::transactions::StoredTransaction;
use sui_kvstore::TransactionData as KVTransactionData;
use sui_types::base_types::ObjectID;
use sui_types::digests::TransactionDigest;
use sui_types::effects::TransactionEffects;
use sui_types::signature::GenericSignature;
use sui_types::transaction::TransactionData;
use sui_types::{
crypto::AuthorityQuorumSignInfo,
event::Event,
messages_checkpoint::{CheckpointContents, CheckpointSummary},
object::Object,
};
Expand All @@ -26,6 +34,12 @@ pub(crate) enum KvLoader {
Pg(Arc<DataLoader<PgReader>>),
}

/// A wrapper for the contents of a transaction, either from Bigtable or Postgres.
pub(crate) enum TransactionContents {
Bigtable(KVTransactionData),
Pg(StoredTransaction),
}

impl KvLoader {
pub(crate) fn new_with_bigtable(bigtable_loader: Arc<DataLoader<BigtableReader>>) -> Self {
Self::Bigtable(bigtable_loader)
Expand Down Expand Up @@ -116,4 +130,107 @@ impl KvLoader {
.transpose(),
}
}

pub(crate) async fn load_one_transaction(
&self,
digest: TransactionDigest,
) -> Result<Option<TransactionContents>, Arc<Error>> {
let key = TransactionKey(digest);
match self {
Self::Bigtable(loader) => Ok(loader
.load_one(key)
.await?
.map(TransactionContents::Bigtable)),
Self::Pg(loader) => Ok(loader.load_one(key).await?.map(TransactionContents::Pg)),
}
}
}

impl TransactionContents {
pub(crate) fn data(&self) -> anyhow::Result<TransactionData> {
match self {
Self::Pg(stored) => {
let data: TransactionData =
bcs::from_bytes(&stored.raw_transaction).map_err(|e| {
anyhow::anyhow!("Failed to deserialize transaction data: {}", e)
})?;

Ok(data)
}
Self::Bigtable(kv) => Ok(kv.transaction.data().transaction_data().clone()),
}
}

pub(crate) fn digest(&self) -> anyhow::Result<TransactionDigest> {
match self {
Self::Pg(stored) => {
let digest =
TransactionDigest::try_from(stored.tx_digest.clone()).map_err(|e| {
anyhow::anyhow!("Failed to deserialize transaction digest: {}", e)
})?;

Ok(digest)
}
Self::Bigtable(kv) => Ok(*kv.transaction.digest()),
}
}

pub(crate) fn signatures(&self) -> anyhow::Result<Vec<GenericSignature>> {
match self {
Self::Pg(stored) => {
let signatures: Vec<GenericSignature> = bcs::from_bytes(&stored.user_signatures)
.map_err(|e| anyhow::anyhow!("Failed to deserialize signatures: {}", e))?;

Ok(signatures)
}
Self::Bigtable(kv) => Ok(kv.transaction.tx_signatures().to_vec()),
}
}

pub(crate) fn effects(&self) -> anyhow::Result<TransactionEffects> {
match self {
Self::Pg(stored) => {
let effects: TransactionEffects = bcs::from_bytes(&stored.raw_effects)
.map_err(|e| anyhow::anyhow!("Failed to deserialize effects: {}", e))?;

Ok(effects)
}
Self::Bigtable(kv) => Ok(kv.effects.clone()),
}
}

pub(crate) fn events(&self) -> anyhow::Result<Vec<Event>> {
match self {
Self::Pg(stored) => {
let events: Vec<Event> = bcs::from_bytes(&stored.events)
.map_err(|e| anyhow::anyhow!("Failed to deserialize events: {}", e))?;

Ok(events)
}
Self::Bigtable(kv) => Ok(kv.events.clone().unwrap_or_default().data),
}
}

pub(crate) fn raw_transaction(&self) -> anyhow::Result<Vec<u8>> {
match self {
Self::Pg(stored) => Ok(stored.raw_transaction.clone()),
Self::Bigtable(kv) => bcs::to_bytes(kv.transaction.data().transaction_data())
.map_err(|e| anyhow::anyhow!("Failed to serialize transaction: {}", e)),
}
}

pub(crate) fn raw_effects(&self) -> anyhow::Result<Vec<u8>> {
match self {
Self::Pg(stored) => Ok(stored.raw_effects.clone()),
Self::Bigtable(kv) => bcs::to_bytes(&kv.effects)
.map_err(|e| anyhow::anyhow!("Failed to serialize effects: {}", e)),
}
}

pub(crate) fn timestamp_ms(&self) -> u64 {
match self {
Self::Pg(stored) => stored.timestamp_ms as u64,
Self::Bigtable(kv) => kv.timestamp,
}
}
}
27 changes: 26 additions & 1 deletion crates/sui-indexer-alt-jsonrpc/src/data/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ use std::{
use async_graphql::dataloader::Loader;
use diesel::{ExpressionMethods, QueryDsl};
use sui_indexer_alt_schema::{schema::kv_transactions, transactions::StoredTransaction};
use sui_kvstore::{KeyValueStoreReader, TransactionData as KVTransactionData};
use sui_types::digests::TransactionDigest;

use super::pg_reader::PgReader;
use super::{bigtable_reader::BigtableReader, pg_reader::PgReader};
use crate::data::error::Error;

/// Key for fetching transaction contents (TransactionData, Effects, and Events) by digest.
Expand Down Expand Up @@ -55,3 +56,27 @@ impl Loader<TransactionKey> for PgReader {
.collect())
}
}

#[async_trait::async_trait]
impl Loader<TransactionKey> for BigtableReader {
type Value = KVTransactionData;
type Error = Arc<Error>;

async fn load(
&self,
keys: &[TransactionKey],
) -> Result<HashMap<TransactionKey, Self::Value>, Self::Error> {
let digests: Vec<_> = keys.iter().map(|k| k.0).collect();
let transactions = self
.0
.clone()
.get_transactions(&digests)
.await
.map_err(|e| Arc::new(Error::BigtableRead(e)))?;

Ok(transactions
.into_iter()
.map(|t| (TransactionKey(*t.transaction.digest()), t))
.collect())
}
}

0 comments on commit 063d223

Please sign in to comment.