Skip to content

Commit

Permalink
feat(torii): subscribe to token updates (metadata etc.)
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Feb 4, 2025
1 parent bf0c5ee commit e984885
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 7 deletions.
1 change: 1 addition & 0 deletions crates/torii/grpc/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ enum ComparisonOperator {
}

message Token {
string token_id = 1;
string contract_address = 2;
string name = 3;
string symbol = 4;
Expand Down
8 changes: 8 additions & 0 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ service World {
// Update token balance subscription
rpc UpdateTokenBalancesSubscription (UpdateTokenBalancesSubscriptionRequest) returns (google.protobuf.Empty);

// Subscribe to token updates.
rpc SubscribeTokens (RetrieveTokensRequest) returns (stream SubscribeTokensResponse);

// Retrieve entities
rpc RetrieveEventMessages (RetrieveEventMessagesRequest) returns (RetrieveEntitiesResponse);

Expand Down Expand Up @@ -85,6 +88,11 @@ message RetrieveTokensResponse {
repeated types.Token tokens = 1;
}

// A response containing token updates
message SubscribeTokensResponse {
types.Token token = 1;
}

// A request to retrieve token balances
message RetrieveTokenBalancesRequest {
// The account addresses to retrieve balances for
Expand Down
12 changes: 5 additions & 7 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,7 @@ use crate::proto::types::member_value::ValueType;
use crate::proto::types::LogicalOperator;
use crate::proto::world::world_server::WorldServer;
use crate::proto::world::{
RetrieveEntitiesStreamingResponse, RetrieveEventMessagesRequest, RetrieveTokenBalancesRequest,
RetrieveTokenBalancesResponse, RetrieveTokensRequest, RetrieveTokensResponse,
SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventMessagesRequest,
SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse,
SubscribeTokenBalancesResponse, UpdateEventMessagesSubscriptionRequest,
UpdateTokenBalancesSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse,
RetrieveEntitiesStreamingResponse, RetrieveEventMessagesRequest, RetrieveTokenBalancesRequest, RetrieveTokenBalancesResponse, RetrieveTokensRequest, RetrieveTokensResponse, SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse, SubscribeTokenBalancesResponse, SubscribeTokensResponse, UpdateEventMessagesSubscriptionRequest, UpdateTokenBalancesSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse
};
use crate::proto::{self};
use crate::types::schema::SchemaError;
Expand Down Expand Up @@ -1248,6 +1243,8 @@ type RetrieveEntitiesStreamingResponseStream =
Pin<Box<dyn Stream<Item = Result<RetrieveEntitiesStreamingResponse, Status>> + Send>>;
type SubscribeTokenBalancesResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeTokenBalancesResponse, Status>> + Send>>;
type SubscribeTokensResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeTokensResponse, Status>> + Send>>;

#[tonic::async_trait]
impl proto::world::world_server::World for DojoWorld {
Expand All @@ -1258,7 +1255,8 @@ impl proto::world::world_server::World for DojoWorld {
type SubscribeIndexerStream = SubscribeIndexerResponseStream;
type RetrieveEntitiesStreamingStream = RetrieveEntitiesStreamingResponseStream;
type SubscribeTokenBalancesStream = SubscribeTokenBalancesResponseStream;

type SubscribeTokensStream = SubscribeTokensResponseStream;

async fn world_metadata(
&self,
_request: Request<WorldMetadataRequest>,
Expand Down
1 change: 1 addition & 0 deletions crates/torii/grpc/src/server/subscriptions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod event_message;
pub mod indexer;
pub mod model_diff;
pub mod token_balance;
pub mod token;

pub(crate) fn match_entity_keys(
id: Felt,
Expand Down
179 changes: 179 additions & 0 deletions crates/torii/grpc/src/server/subscriptions/token.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};

use futures::{Stream, StreamExt};
use rand::Rng;
use starknet_crypto::Felt;
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use tokio::sync::RwLock;
use torii_sqlite::error::{Error, ParseError};
use torii_sqlite::simple_broker::SimpleBroker;
use torii_sqlite::types::Token;
use tracing::{error, trace};

use crate::proto;
use crate::proto::world::{SubscribeTokenBalancesResponse, SubscribeTokensResponse};

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::balance";

#[derive(Debug)]
pub struct TokenSubscriber {
/// Contract addresses that the subscriber is interested in
/// If empty, subscriber receives updates for all contracts
pub contract_addresses: HashSet<Felt>,
/// The channel to send the response back to the subscriber.
pub sender: Sender<Result<SubscribeTokenBalancesResponse, tonic::Status>>,
}

#[derive(Debug, Default)]
pub struct TokenManager {
subscribers: RwLock<HashMap<u64, TokenSubscriber>>,
}

impl TokenManager {
pub async fn add_subscriber(
&self,
contract_addresses: Vec<Felt>,
) -> Result<Receiver<Result<SubscribeTokenBalancesResponse, tonic::Status>>, Error> {
let subscription_id = rand::thread_rng().gen::<u64>();
let (sender, receiver) = channel(1);

// Send initial empty response
let _ = sender
.send(Ok(SubscribeTokenBalancesResponse { subscription_id, balance: None }))
.await;

self.subscribers.write().await.insert(
subscription_id,
TokenSubscriber {
contract_addresses: contract_addresses.into_iter().collect(),
sender,
},
);

Ok(receiver)
}

pub async fn update_subscriber(
&self,
id: u64,
contract_addresses: Vec<Felt>,
) {
let sender = {
let subscribers = self.subscribers.read().await;
if let Some(subscriber) = subscribers.get(&id) {
subscriber.sender.clone()
} else {
return; // Subscriber not found, exit early
}
};

self.subscribers.write().await.insert(
id,
TokenSubscriber {
contract_addresses: contract_addresses.into_iter().collect(),
sender,
},
);
}

pub(super) async fn remove_subscriber(&self, id: u64) {
self.subscribers.write().await.remove(&id);
}
}

#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
simple_broker: Pin<Box<dyn Stream<Item = Token> + Send>>,
balance_sender: UnboundedSender<Token>,
}

impl Service {
pub fn new(subs_manager: Arc<TokenManager>) -> Self {
let (balance_sender, balance_receiver) = unbounded_channel();
let service = Self {
simple_broker: Box::pin(SimpleBroker::<Token>::subscribe()),
balance_sender,
};

tokio::spawn(Self::publish_updates(subs_manager, balance_receiver));

service
}

async fn publish_updates(
subs: Arc<TokenManager>,
mut balance_receiver: UnboundedReceiver<Token>,
) {
while let Some(balance) = balance_receiver.recv().await {
if let Err(e) = Self::process_balance_update(&subs, &balance).await {
error!(target = LOG_TARGET, error = %e, "Processing balance update.");
}
}
}

async fn process_balance_update(
subs: &Arc<TokenManager>,
balance: &Token,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();

for (idx, sub) in subs.subscribers.read().await.iter() {
let contract_address =
Felt::from_str(&balance.contract_address).map_err(ParseError::FromStr)?;

// Skip if contract address filter doesn't match
if !sub.contract_addresses.is_empty()
&& !sub.contract_addresses.contains(&contract_address)
{
continue;
}


let resp = SubscribeTokensResponse {
token: Some(proto::types::Token {
id: balance.id.clone(),
contract_address: balance.contract_address.clone(),
name: balance.name.clone(),
symbol: balance.symbol.clone(),
decimals: balance.decimals,
metadata: balance.metadata.clone(),
}),
};

if sub.sender.send(Ok(resp)).await.is_err() {
closed_stream.push(*idx);
}
}

for id in closed_stream {
trace!(target = LOG_TARGET, id = %id, "Closing balance stream.");
subs.remove_subscriber(id).await
}

Ok(())
}
}

impl Future for Service {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Poll::Ready(Some(balance)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.balance_sender.send(balance) {
error!(target = LOG_TARGET, error = %e, "Sending balance update to processor.");
}
}

Poll::Pending
}
}

0 comments on commit e984885

Please sign in to comment.