Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii-grpc): subscribe to token updates (metadata etc.) #2990

Merged
merged 5 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/torii/grpc/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ enum ComparisonOperator {
}

message Token {
string token_id = 1;
string contract_address = 2;
string name = 3;
string symbol = 4;
Expand Down
8 changes: 8 additions & 0 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ service World {
// Update token balance subscription
rpc UpdateTokenBalancesSubscription (UpdateTokenBalancesSubscriptionRequest) returns (google.protobuf.Empty);

// Subscribe to token updates.
rpc SubscribeTokens (RetrieveTokensRequest) returns (stream SubscribeTokensResponse);

// Retrieve entities
rpc RetrieveEventMessages (RetrieveEventMessagesRequest) returns (RetrieveEntitiesResponse);

Expand Down Expand Up @@ -96,6 +99,11 @@ message RetrieveTokensResponse {
repeated types.Token tokens = 1;
}

// A response containing token updates
message SubscribeTokensResponse {
types.Token token = 1;
}

// A request to retrieve token balances
message RetrieveTokenBalancesRequest {
// The account addresses to retrieve balances for
Expand Down
38 changes: 36 additions & 2 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
use starknet::providers::JsonRpcClient;
use subscriptions::event::EventManager;
use subscriptions::indexer::IndexerManager;
use subscriptions::token::TokenManager;
use subscriptions::token_balance::TokenBalanceManager;
use tokio::net::TcpListener;
use tokio::sync::mpsc::{channel, Receiver};
Expand Down Expand Up @@ -61,8 +62,8 @@
RetrieveTokensRequest, RetrieveTokensResponse, SubscribeEntitiesRequest,
SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsResponse,
SubscribeIndexerRequest, SubscribeIndexerResponse, SubscribeTokenBalancesResponse,
UpdateEventMessagesSubscriptionRequest, UpdateTokenBalancesSubscriptionRequest,
WorldMetadataRequest, WorldMetadataResponse,
SubscribeTokensResponse, UpdateEventMessagesSubscriptionRequest,
UpdateTokenBalancesSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse,
};
use crate::proto::{self};
use crate::types::schema::SchemaError;
Expand Down Expand Up @@ -96,6 +97,7 @@
impl From<Token> for proto::types::Token {
fn from(value: Token) -> Self {
Self {
token_id: value.id,

Check warning on line 100 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L100

Added line #L100 was not covered by tests
contract_address: value.contract_address,
name: value.name,
symbol: value.symbol,
Expand Down Expand Up @@ -127,6 +129,7 @@
state_diff_manager: Arc<StateDiffManager>,
indexer_manager: Arc<IndexerManager>,
token_balance_manager: Arc<TokenBalanceManager>,
token_manager: Arc<TokenManager>,
}

impl DojoWorld {
Expand All @@ -143,6 +146,7 @@
let state_diff_manager = Arc::new(StateDiffManager::default());
let indexer_manager = Arc::new(IndexerManager::default());
let token_balance_manager = Arc::new(TokenBalanceManager::default());
let token_manager = Arc::new(TokenManager::default());

tokio::task::spawn(subscriptions::model_diff::Service::new_with_block_rcv(
block_rx,
Expand All @@ -165,6 +169,8 @@
&token_balance_manager,
)));

tokio::task::spawn(subscriptions::token::Service::new(Arc::clone(&token_manager)));

Self {
pool,
world_address,
Expand All @@ -175,6 +181,7 @@
state_diff_manager,
indexer_manager,
token_balance_manager,
token_manager,
}
}
}
Expand Down Expand Up @@ -790,6 +797,13 @@
Ok(RetrieveTokensResponse { tokens })
}

async fn subscribe_tokens(
&self,
contract_addresses: Vec<Felt>,
) -> Result<Receiver<Result<SubscribeTokensResponse, tonic::Status>>, Error> {
self.token_manager.add_subscriber(contract_addresses).await
}

Check warning on line 805 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L800-L805

Added lines #L800 - L805 were not covered by tests

async fn retrieve_token_balances(
&self,
account_addresses: Vec<Felt>,
Expand Down Expand Up @@ -1281,6 +1295,8 @@
Pin<Box<dyn Stream<Item = Result<RetrieveEntitiesStreamingResponse, Status>> + Send>>;
type SubscribeTokenBalancesResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeTokenBalancesResponse, Status>> + Send>>;
type SubscribeTokensResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeTokensResponse, Status>> + Send>>;

#[tonic::async_trait]
impl proto::world::world_server::World for DojoWorld {
Expand All @@ -1291,6 +1307,7 @@
type SubscribeIndexerStream = SubscribeIndexerResponseStream;
type RetrieveEntitiesStreamingStream = RetrieveEntitiesStreamingResponseStream;
type SubscribeTokenBalancesStream = SubscribeTokenBalancesResponseStream;
type SubscribeTokensStream = SubscribeTokensResponseStream;

async fn world_metadata(
&self,
Expand Down Expand Up @@ -1338,6 +1355,23 @@
Ok(Response::new(tokens))
}

async fn subscribe_tokens(
&self,
request: Request<RetrieveTokensRequest>,
) -> ServiceResult<Self::SubscribeTokensStream> {
let RetrieveTokensRequest { contract_addresses } = request.into_inner();
let contract_addresses = contract_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();

Check warning on line 1366 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1361-L1366

Added lines #L1361 - L1366 were not covered by tests

let rx = self
.subscribe_tokens(contract_addresses)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeTokensStream))
}

Check warning on line 1373 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1368-L1373

Added lines #L1368 - L1373 were not covered by tests

async fn retrieve_token_balances(
&self,
request: Request<RetrieveTokenBalancesRequest>,
Expand Down
1 change: 1 addition & 0 deletions crates/torii/grpc/src/server/subscriptions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod event;
pub mod event_message;
pub mod indexer;
pub mod model_diff;
pub mod token;
pub mod token_balance;

pub(crate) fn match_entity_keys(
Expand Down
167 changes: 167 additions & 0 deletions crates/torii/grpc/src/server/subscriptions/token.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};

use futures::{Stream, StreamExt};
use rand::Rng;
use starknet_crypto::Felt;
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use tokio::sync::RwLock;
use torii_sqlite::error::{Error, ParseError};
use torii_sqlite::simple_broker::SimpleBroker;
use torii_sqlite::types::Token;
use tracing::{error, trace};

use crate::proto;
use crate::proto::world::SubscribeTokensResponse;

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::balance";

#[derive(Debug)]
pub struct TokenSubscriber {
/// Contract addresses that the subscriber is interested in
/// If empty, subscriber receives updates for all contracts
pub contract_addresses: HashSet<Felt>,
/// The channel to send the response back to the subscriber.
pub sender: Sender<Result<SubscribeTokensResponse, tonic::Status>>,
}

#[derive(Debug, Default)]
pub struct TokenManager {
subscribers: RwLock<HashMap<u64, TokenSubscriber>>,
}

impl TokenManager {
pub async fn add_subscriber(
&self,
contract_addresses: Vec<Felt>,
) -> Result<Receiver<Result<SubscribeTokensResponse, tonic::Status>>, Error> {
let subscription_id = rand::thread_rng().gen::<u64>();
let (sender, receiver) = channel(1);

// Send initial empty response
let _ = sender.send(Ok(SubscribeTokensResponse { token: None })).await;

Check warning on line 48 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L40-L48

Added lines #L40 - L48 were not covered by tests

self.subscribers.write().await.insert(
subscription_id,
TokenSubscriber {
contract_addresses: contract_addresses.into_iter().collect(),
sender,
},
);

Ok(receiver)
}

Check warning on line 59 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L50-L59

Added lines #L50 - L59 were not covered by tests

pub async fn update_subscriber(&self, id: u64, contract_addresses: Vec<Felt>) {
let sender = {
let subscribers = self.subscribers.read().await;
if let Some(subscriber) = subscribers.get(&id) {
subscriber.sender.clone()

Check warning on line 65 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L61-L65

Added lines #L61 - L65 were not covered by tests
} else {
return; // Subscriber not found, exit early

Check warning on line 67 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L67

Added line #L67 was not covered by tests
}
};

self.subscribers.write().await.insert(
id,
TokenSubscriber {
contract_addresses: contract_addresses.into_iter().collect(),
sender,
},
);
}

Check warning on line 78 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L71-L78

Added lines #L71 - L78 were not covered by tests

pub(super) async fn remove_subscriber(&self, id: u64) {
self.subscribers.write().await.remove(&id);
}

Check warning on line 82 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L80-L82

Added lines #L80 - L82 were not covered by tests
}

#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
simple_broker: Pin<Box<dyn Stream<Item = Token> + Send>>,
balance_sender: UnboundedSender<Token>,
}

impl Service {
pub fn new(subs_manager: Arc<TokenManager>) -> Self {
let (balance_sender, balance_receiver) = unbounded_channel();
let service =
Self { simple_broker: Box::pin(SimpleBroker::<Token>::subscribe()), balance_sender };

tokio::spawn(Self::publish_updates(subs_manager, balance_receiver));

service
}

async fn publish_updates(
subs: Arc<TokenManager>,
mut balance_receiver: UnboundedReceiver<Token>,
) {
while let Some(balance) = balance_receiver.recv().await {
if let Err(e) = Self::process_balance_update(&subs, &balance).await {
error!(target = LOG_TARGET, error = %e, "Processing balance update.");
}

Check warning on line 110 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L108-L110

Added lines #L108 - L110 were not covered by tests
}
}

Check warning on line 112 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L112

Added line #L112 was not covered by tests

async fn process_balance_update(subs: &Arc<TokenManager>, token: &Token) -> Result<(), Error> {
let mut closed_stream = Vec::new();

Check warning on line 115 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L114-L115

Added lines #L114 - L115 were not covered by tests

for (idx, sub) in subs.subscribers.read().await.iter() {
let contract_address =
Felt::from_str(&token.contract_address).map_err(ParseError::FromStr)?;

Check warning on line 119 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L117-L119

Added lines #L117 - L119 were not covered by tests

// Skip if contract address filter doesn't match
if !sub.contract_addresses.is_empty()
&& !sub.contract_addresses.contains(&contract_address)

Check warning on line 123 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L122-L123

Added lines #L122 - L123 were not covered by tests
{
continue;
}

let resp = SubscribeTokensResponse {
token: Some(proto::types::Token {
token_id: token.id.clone(),
contract_address: token.contract_address.clone(),
name: token.name.clone(),
symbol: token.symbol.clone(),
decimals: token.decimals as u32,
metadata: token.metadata.clone(),
}),
};

if sub.sender.send(Ok(resp)).await.is_err() {
closed_stream.push(*idx);
}

Check warning on line 141 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L125-L141

Added lines #L125 - L141 were not covered by tests
}

for id in closed_stream {
trace!(target = LOG_TARGET, id = %id, "Closing balance stream.");
subs.remove_subscriber(id).await

Check warning on line 146 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L144-L146

Added lines #L144 - L146 were not covered by tests
}

Ok(())
}

Check warning on line 150 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L149-L150

Added lines #L149 - L150 were not covered by tests
}

impl Future for Service {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Poll::Ready(Some(balance)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.balance_sender.send(balance) {
error!(target = LOG_TARGET, error = %e, "Sending balance update to processor.");
}

Check warning on line 162 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L160-L162

Added lines #L160 - L162 were not covered by tests
}

Poll::Pending
}
}
Loading