Skip to content

Commit fe842e2

Browse files
authored
fix(brc20): verify ordinal transfers in chunks instead of individually (#394)
* chore: group transfers * fix: finish integration * fix: chunk query * chunk size * test: indexing * fix: comments
1 parent 51d3a76 commit fe842e2

File tree

7 files changed

+728
-366
lines changed

7 files changed

+728
-366
lines changed

components/chainhook-postgres/src/lib.rs

+5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ pub use tokio_postgres;
77

88
use tokio_postgres::{Client, Config, NoTls, Row};
99

10+
/// Standard chunk size to use when we're batching multiple query inserts into a single SQL statement to save on DB round trips.
11+
/// This number is designed to not hit the postgres limit of 65536 query parameters in a single SQL statement, but results may
12+
/// vary depending on column counts. Queries should use other custom chunk sizes as needed.
13+
pub const BATCH_QUERY_CHUNK_SIZE: usize = 500;
14+
1015
/// A Postgres configuration for a single database.
1116
#[derive(Clone, Debug)]
1217
pub struct PgConnectionConfig {

components/ordhook-core/src/core/meta_protocols/brc20/brc20_pg.rs

+54-23
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use chainhook_postgres::{
44
deadpool_postgres::GenericClient,
55
tokio_postgres::{types::ToSql, Client},
66
types::{PgNumericU128, PgNumericU64},
7-
utils, FromPgRow,
7+
utils, FromPgRow, BATCH_QUERY_CHUNK_SIZE,
88
};
99
use chainhook_sdk::types::{
1010
BitcoinBlockData, Brc20BalanceData, Brc20Operation, Brc20TokenDeployData, Brc20TransferData,
@@ -79,24 +79,43 @@ pub async fn get_token_available_balance_for_address<T: GenericClient>(
7979
Ok(Some(supply.0))
8080
}
8181

82-
pub async fn get_unsent_token_transfer<T: GenericClient>(
83-
ordinal_number: u64,
82+
pub async fn get_unsent_token_transfers<T: GenericClient>(
83+
ordinal_numbers: &Vec<u64>,
8484
client: &T,
85-
) -> Result<Option<DbOperation>, String> {
86-
let row = client
87-
.query_opt(
88-
"SELECT * FROM operations
89-
WHERE ordinal_number = $1 AND operation = 'transfer'
90-
AND NOT EXISTS (SELECT 1 FROM operations WHERE ordinal_number = $1 AND operation = 'transfer_send')
91-
LIMIT 1",
92-
&[&PgNumericU64(ordinal_number)],
93-
)
94-
.await
95-
.map_err(|e| format!("get_unsent_token_transfer: {e}"))?;
96-
let Some(row) = row else {
97-
return Ok(None);
98-
};
99-
Ok(Some(DbOperation::from_pg_row(&row)))
85+
) -> Result<Vec<DbOperation>, String> {
86+
if ordinal_numbers.is_empty() {
87+
return Ok(vec![]);
88+
}
89+
let mut results = vec![];
90+
// We can afford a larger chunk size here because we're only using one parameter per ordinal number value.
91+
for chunk in ordinal_numbers.chunks(5000) {
92+
let mut wrapped = Vec::with_capacity(chunk.len());
93+
for n in chunk {
94+
wrapped.push(PgNumericU64(*n));
95+
}
96+
let mut params = vec![];
97+
for number in wrapped.iter() {
98+
params.push(number);
99+
}
100+
let rows = client
101+
.query(
102+
"SELECT *
103+
FROM operations o
104+
WHERE operation = 'transfer'
105+
AND o.ordinal_number = ANY($1)
106+
AND NOT EXISTS (
107+
SELECT 1 FROM operations
108+
WHERE ordinal_number = o.ordinal_number
109+
AND operation = 'transfer_send'
110+
)
111+
LIMIT 1",
112+
&[&params],
113+
)
114+
.await
115+
.map_err(|e| format!("get_unsent_token_transfers: {e}"))?;
116+
results.extend(rows.iter().map(|row| DbOperation::from_pg_row(row)));
117+
}
118+
Ok(results)
100119
}
101120

102121
pub async fn insert_tokens<T: GenericClient>(
@@ -106,7 +125,7 @@ pub async fn insert_tokens<T: GenericClient>(
106125
if tokens.len() == 0 {
107126
return Ok(());
108127
}
109-
for chunk in tokens.chunks(500) {
128+
for chunk in tokens.chunks(BATCH_QUERY_CHUNK_SIZE) {
110129
let mut params: Vec<&(dyn ToSql + Sync)> = vec![];
111130
for row in chunk.iter() {
112131
params.push(&row.ticker);
@@ -148,7 +167,7 @@ pub async fn insert_operations<T: GenericClient>(
148167
if operations.len() == 0 {
149168
return Ok(());
150169
}
151-
for chunk in operations.chunks(500) {
170+
for chunk in operations.chunks(BATCH_QUERY_CHUNK_SIZE) {
152171
let mut params: Vec<&(dyn ToSql + Sync)> = vec![];
153172
for row in chunk.iter() {
154173
params.push(&row.ticker);
@@ -253,7 +272,11 @@ pub async fn update_address_operation_counts<T: GenericClient>(
253272
if counts.len() == 0 {
254273
return Ok(());
255274
}
256-
for chunk in counts.keys().collect::<Vec<&String>>().chunks(500) {
275+
for chunk in counts
276+
.keys()
277+
.collect::<Vec<&String>>()
278+
.chunks(BATCH_QUERY_CHUNK_SIZE)
279+
{
257280
let mut params: Vec<&(dyn ToSql + Sync)> = vec![];
258281
let mut insert_rows = 0;
259282
for address in chunk {
@@ -287,7 +310,11 @@ pub async fn update_token_operation_counts<T: GenericClient>(
287310
if counts.len() == 0 {
288311
return Ok(());
289312
}
290-
for chunk in counts.keys().collect::<Vec<&String>>().chunks(500) {
313+
for chunk in counts
314+
.keys()
315+
.collect::<Vec<&String>>()
316+
.chunks(BATCH_QUERY_CHUNK_SIZE)
317+
{
291318
let mut converted = HashMap::new();
292319
for tick in chunk {
293320
converted.insert(*tick, counts.get(*tick).unwrap().to_string());
@@ -324,7 +351,11 @@ pub async fn update_token_minted_supplies<T: GenericClient>(
324351
if supplies.len() == 0 {
325352
return Ok(());
326353
}
327-
for chunk in supplies.keys().collect::<Vec<&String>>().chunks(500) {
354+
for chunk in supplies
355+
.keys()
356+
.collect::<Vec<&String>>()
357+
.chunks(BATCH_QUERY_CHUNK_SIZE)
358+
{
328359
let mut converted = HashMap::new();
329360
for tick in chunk {
330361
converted.insert(*tick, supplies.get(*tick).unwrap().0.to_string());

components/ordhook-core/src/core/meta_protocols/brc20/cache.rs

+39-22
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::{collections::HashMap, num::NonZeroUsize};
1+
use std::{
2+
collections::{HashMap, HashSet},
3+
num::NonZeroUsize,
4+
};
25

36
use chainhook_postgres::{
47
deadpool_postgres::GenericClient,
@@ -146,30 +149,44 @@ impl Brc20MemoryCache {
146149
return Ok(None);
147150
}
148151

149-
pub async fn get_unsent_token_transfer<T: GenericClient>(
152+
pub async fn get_unsent_token_transfers<T: GenericClient>(
150153
&mut self,
151-
ordinal_number: u64,
154+
ordinal_numbers: &Vec<&u64>,
152155
client: &T,
153-
) -> Result<Option<DbOperation>, String> {
154-
// Use `get` instead of `contains` so we promote this value in the LRU.
155-
if let Some(_) = self.ignored_inscriptions.get(&ordinal_number) {
156-
return Ok(None);
157-
}
158-
if let Some(row) = self.unsent_transfers.get(&ordinal_number) {
159-
return Ok(Some(row.clone()));
156+
) -> Result<Vec<DbOperation>, String> {
157+
let mut results = vec![];
158+
let mut cache_missed_ordinal_numbers = HashSet::new();
159+
for ordinal_number in ordinal_numbers.iter() {
160+
// Use `get` instead of `contains` so we promote this value in the LRU.
161+
if let Some(_) = self.ignored_inscriptions.get(*ordinal_number) {
162+
continue;
163+
}
164+
if let Some(row) = self.unsent_transfers.get(*ordinal_number) {
165+
results.push(row.clone());
166+
} else {
167+
cache_missed_ordinal_numbers.insert(**ordinal_number);
168+
}
160169
}
161-
self.handle_cache_miss(client).await?;
162-
match brc20_pg::get_unsent_token_transfer(ordinal_number, client).await? {
163-
Some(row) => {
164-
self.unsent_transfers.put(ordinal_number, row.clone());
165-
return Ok(Some(row));
170+
if !cache_missed_ordinal_numbers.is_empty() {
171+
// Some ordinal numbers were not in cache, check DB.
172+
self.handle_cache_miss(client).await?;
173+
let pending_transfers = brc20_pg::get_unsent_token_transfers(
174+
&cache_missed_ordinal_numbers.iter().cloned().collect(),
175+
client,
176+
)
177+
.await?;
178+
for unsent_transfer in pending_transfers.into_iter() {
179+
cache_missed_ordinal_numbers.remove(&unsent_transfer.ordinal_number.0);
180+
self.unsent_transfers
181+
.put(unsent_transfer.ordinal_number.0, unsent_transfer.clone());
182+
results.push(unsent_transfer);
166183
}
167-
None => {
168-
// Inscription is not relevant for BRC20.
169-
self.ignore_inscription(ordinal_number);
170-
return Ok(None);
184+
// Ignore all irrelevant numbers.
185+
for irrelevant_number in cache_missed_ordinal_numbers.iter() {
186+
self.ignore_inscription(*irrelevant_number);
171187
}
172188
}
189+
return Ok(results);
173190
}
174191

175192
/// Marks an ordinal number as ignored so we don't bother computing its transfers for BRC20 purposes.
@@ -456,12 +473,12 @@ impl Brc20MemoryCache {
456473
return Ok(transfer.clone());
457474
}
458475
self.handle_cache_miss(client).await?;
459-
let Some(transfer) = brc20_pg::get_unsent_token_transfer(ordinal_number, client).await?
460-
else {
476+
let transfers = brc20_pg::get_unsent_token_transfers(&vec![ordinal_number], client).await?;
477+
let Some(transfer) = transfers.first() else {
461478
unreachable!("Invalid transfer ordinal number {}", ordinal_number)
462479
};
463480
self.unsent_transfers.put(ordinal_number, transfer.clone());
464-
return Ok(transfer);
481+
return Ok(transfer.clone());
465482
}
466483

467484
async fn handle_cache_miss<T: GenericClient>(&mut self, client: &T) -> Result<(), String> {

0 commit comments

Comments
 (0)