Skip to content

Commit

Permalink
scope erc metadat tasks semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Feb 24, 2025
1 parent ad9e489 commit 4cb3764
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 15 deletions.
16 changes: 8 additions & 8 deletions crates/torii/cli/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub const DEFAULT_RELAY_PORT: u16 = 9090;
pub const DEFAULT_RELAY_WEBRTC_PORT: u16 = 9091;
pub const DEFAULT_RELAY_WEBSOCKET_PORT: u16 = 9092;

pub const DEFAULT_ERC_MAX_CONCURRENT_TASKS: usize = 10;
pub const DEFAULT_ERC_MAX_METADATA_TASKS: usize = 10;

#[derive(Debug, clap::Args, Clone, Serialize, Deserialize, PartialEq)]
#[command(next_help_heading = "Relay options")]
Expand Down Expand Up @@ -354,12 +354,12 @@ pub struct ErcOptions {
/// The maximum number of concurrent tasks to use for indexing ERC721 and ERC1155 token
/// metadata.
#[arg(
long = "erc.max_concurrent_tasks",
default_value_t = DEFAULT_ERC_MAX_CONCURRENT_TASKS,
long = "erc.max_metadata_tasks",
default_value_t = DEFAULT_ERC_MAX_METADATA_TASKS,
help = "The maximum number of concurrent tasks to use for indexing ERC721 and ERC1155 token metadata."
)]
#[serde(default = "default_erc_max_concurrent_tasks")]
pub max_concurrent_tasks: usize,
#[serde(default = "default_erc_max_metadata_tasks")]
pub max_metadata_tasks: usize,

Check warning on line 362 in crates/torii/cli/src/options.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/cli/src/options.rs#L362

Added line #L362 was not covered by tests

/// Path to a directory to store ERC artifacts
#[arg(long)]
Expand All @@ -368,7 +368,7 @@ pub struct ErcOptions {

impl Default for ErcOptions {
fn default() -> Self {
Self { max_concurrent_tasks: DEFAULT_MAX_CONCURRENT_TASKS, artifacts_path: None }
Self { max_metadata_tasks: DEFAULT_ERC_MAX_METADATA_TASKS, artifacts_path: None }
}
}

Expand Down Expand Up @@ -460,6 +460,6 @@ fn default_relay_websocket_port() -> u16 {
DEFAULT_RELAY_WEBSOCKET_PORT
}

fn default_erc_max_concurrent_tasks() -> usize {
DEFAULT_ERC_MAX_CONCURRENT_TASKS
fn default_erc_max_metadata_tasks() -> usize {
DEFAULT_ERC_MAX_METADATA_TASKS
}

Check warning on line 465 in crates/torii/cli/src/options.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/cli/src/options.rs#L463-L465

Added lines #L463 - L465 were not covered by tests
2 changes: 1 addition & 1 deletion crates/torii/runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl Runner {
pool.clone(),
shutdown_tx.clone(),
provider.clone(),
self.args.erc.max_concurrent_tasks,
self.args.erc.max_metadata_tasks,

Check warning on line 145 in crates/torii/runner/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/runner/src/lib.rs#L145

Added line #L145 was not covered by tests
)
.await?;
let executor_handle = tokio::spawn(async move { executor.run().await });
Expand Down
12 changes: 6 additions & 6 deletions crates/torii/sqlite/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ pub struct Executor<'c, P: Provider + Sync + Send + 'static> {
// It is used to make RPC calls to fetch token_uri data for erc721 contracts
provider: Arc<P>,
// Used to limit number of tasks that run in parallel to fetch metadata
semaphore: Arc<Semaphore>,
metadata_semaphore: Arc<Semaphore>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -234,13 +234,13 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
pool: Pool<Sqlite>,
shutdown_tx: Sender<()>,
provider: Arc<P>,
max_concurrent_tasks: usize,
max_metadata_tasks: usize,
) -> Result<(Self, UnboundedSender<QueryMessage>)> {
let (tx, rx) = unbounded_channel();
let transaction = pool.begin().await?;
let publish_queue = Vec::new();
let shutdown_rx = shutdown_tx.subscribe();
let semaphore = Arc::new(Semaphore::new(max_concurrent_tasks));
let metadata_semaphore = Arc::new(Semaphore::new(max_metadata_tasks));

Ok((
Executor {
Expand All @@ -252,7 +252,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
register_tasks: JoinSet::new(),
deferred_query_messages: Vec::new(),
provider,
semaphore,
metadata_semaphore,
},
tx,
))
Expand Down Expand Up @@ -603,7 +603,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), "Applied balance diff.");
}
QueryType::RegisterNftToken(register_nft_token) => {
let semaphore = self.semaphore.clone();
let metadata_semaphore = self.metadata_semaphore.clone();
let provider = self.provider.clone();

let res = sqlx::query_as::<_, (String, String)>(&format!(
Expand Down Expand Up @@ -675,7 +675,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
};

self.register_tasks.spawn(async move {
let permit = semaphore.acquire().await.unwrap();
let permit = metadata_semaphore.acquire().await.unwrap();

let result = Self::process_register_nft_token_query(
register_nft_token,
Expand Down

0 comments on commit 4cb3764

Please sign in to comment.