Skip to content

Commit

Permalink
comments, including rafactor to split template generator, enricher an…
Browse files Browse the repository at this point in the history
…d executor
  • Loading branch information
gegaowp committed Feb 3, 2025
1 parent bb6c34c commit 592ae3d
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 188 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-rpc-benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ sui-indexer-alt-framework.workspace = true
telemetry-subscribers.workspace = true
tracing.workspace = true
tokio = { workspace = true, features = ["full"] }
url.workspace = true
tokio-postgres = "0.7.12"
bb8 = "0.9.0"
bb8-postgres = "0.9.0"
Expand Down
7 changes: 4 additions & 3 deletions crates/sui-rpc-benchmark/src/direct/benchmark_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ use std::time::Duration;
pub struct BenchmarkConfig {
/// Number of concurrent clients
pub concurrency: usize,
/// Duration to run the benchmark
pub duration: Duration,
/// All queries will execute if they finish within the specified timeout.
/// Otherwise, the binary will stop at that time and report collected metrics.
pub timeout: Duration,
}

impl Default for BenchmarkConfig {
fn default() -> Self {
Self {
concurrency: 50,
duration: Duration::from_secs(30),
timeout: Duration::from_secs(30),
}
}
}
15 changes: 11 additions & 4 deletions crates/sui-rpc-benchmark/src/direct/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
use dashmap::DashMap;
use std::sync::Arc;
use std::time::Duration;
use tracing::debug;

use super::query_template_generator::QueryTemplate;

#[derive(Debug, Default)]
pub struct QueryMetrics {
Expand Down Expand Up @@ -38,17 +41,21 @@ pub struct MetricsCollector {

impl MetricsCollector {
/// Records a query execution with its latency and error status
///
///
/// # Arguments
/// * `query_type` - The type/name of the query being recorded
/// * `query_template` - The QueryTemplate being recorded
/// * `latency` - The duration taken to execute the query
/// * `is_error` - Whether the query resulted in an error
pub fn record_query(&self, query_type: &str, latency: Duration, is_error: bool) {
let mut entry = self.metrics.entry(query_type.to_string()).or_default();
pub fn record_query(&self, query_template: QueryTemplate, latency: Duration, is_error: bool) {
let mut entry = self
.metrics
.entry(query_template.table_name.to_string())
.or_default();

entry.total_queries += 1;
if is_error {
entry.errors += 1;
debug!("Error executing query: {:?}", query_template);
} else {
entry.latency_ms.push(latency.as_secs_f64() * 1000.0);
}
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-rpc-benchmark/src/direct/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@

pub mod benchmark_config;
pub mod metrics;
pub mod query_enricher;
pub mod query_executor;
pub mod query_generator;
pub mod query_template_generator;
107 changes: 107 additions & 0 deletions crates/sui-rpc-benchmark/src/direct/query_enricher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

/// This module enriches query templates with real data from the database.
/// This enrichment ensures that when we run the benchmark:
/// - We use realistic data values that actually exist in the database:
/// - We have a pool of valid values to randomly select from during execution.
use anyhow::Result;
use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use tokio_postgres::{types::Type, NoTls, Row};
use tracing::warn;
use url::Url;

use crate::direct::query_template_generator::QueryTemplate;

#[derive(Clone, Debug)]
pub enum SqlValue {
Text(Option<String>),
Int4(Option<i32>),
Int8(Option<i64>),
Float8(Option<f64>),
Bool(Option<bool>),
Int2(Option<i16>),
Bytea(Option<Vec<u8>>),
}

#[derive(Debug, Clone)]
pub struct EnrichedBenchmarkQuery {
pub query: QueryTemplate,
pub rows: Vec<Vec<SqlValue>>,
pub types: Vec<Type>,
}

pub struct QueryEnricher {
pool: Pool<PostgresConnectionManager<NoTls>>,
}

impl QueryEnricher {
pub async fn new(db_url: &Url) -> Result<Self> {
let manager = PostgresConnectionManager::new_from_stringlike(db_url.as_str(), NoTls)?;
let pool = Pool::builder().build(manager).await?;
Ok(Self { pool })
}

fn row_to_values(row: &Row) -> Vec<SqlValue> {
(0..row.len())
.map(|i| match row.columns()[i].type_() {
&Type::TEXT | &Type::VARCHAR => SqlValue::Text(row.get(i)),
&Type::INT4 => SqlValue::Int4(row.get(i)),
&Type::INT8 => SqlValue::Int8(row.get(i)),
&Type::FLOAT8 => SqlValue::Float8(row.get(i)),
&Type::BOOL => SqlValue::Bool(row.get(i)),
&Type::INT2 => SqlValue::Int2(row.get(i)),
&Type::BYTEA => SqlValue::Bytea(row.get(i)),
ty => panic!("Unsupported type: {:?}", ty),
})
.collect()
}

async fn enrich_query(&self, query: &QueryTemplate) -> Result<EnrichedBenchmarkQuery> {
let client = self.pool.get().await?;
let sql = format!(
"SELECT {} FROM {} WHERE {} IS NOT NULL LIMIT 1000",
query.needed_columns.join(", "),
query.table_name,
query.needed_columns[0]
);

let rows = client.query(&sql, &[]).await?;
let Some(first_row) = rows.first() else {
warn!(
table = query.table_name,
"No sample data found for query on table, table is empty."
);
return Ok(EnrichedBenchmarkQuery {
query: query.clone(),
rows: Vec::new(),
types: query.needed_columns.iter().map(|_| Type::TEXT).collect(), // default type
});
};
let types = first_row
.columns()
.iter()
.map(|c| c.type_().clone())
.collect();
let raw_rows = rows.iter().map(Self::row_to_values).collect();

Ok(EnrichedBenchmarkQuery {
query: query.clone(),
rows: raw_rows,
types,
})
}

pub async fn enrich_queries(
&self,
queries: Vec<QueryTemplate>,
) -> Result<Vec<EnrichedBenchmarkQuery>> {
let mut enriched_queries = Vec::new();
for query in queries {
let enriched = self.enrich_query(&query).await?;
enriched_queries.push(enriched);
}
Ok(enriched_queries)
}
}
126 changes: 16 additions & 110 deletions crates/sui-rpc-benchmark/src/direct/query_executor.rs
Original file line number Diff line number Diff line change
@@ -1,148 +1,58 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

/// This module executes enriched benchmark queries against the database.
/// Each query's execution is timed and recorded via MetricsCollector.
/// And the results are aggregated and reported via BenchmarkResult.
use std::time::Instant;

use anyhow::Result;
use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use rand::seq::SliceRandom;
use rand::SeedableRng;
use sui_indexer_alt_framework::task::TrySpawnStreamExt;
use tokio_postgres::{types::ToSql, types::Type, NoTls, Row};
use tokio_postgres::{types::ToSql, NoTls};
use tracing::info;
use url::Url;

use crate::direct::benchmark_config::BenchmarkConfig;
use crate::direct::metrics::{BenchmarkResult, MetricsCollector};
use crate::direct::query_generator::BenchmarkQuery;
use crate::direct::query_enricher::{EnrichedBenchmarkQuery, SqlValue};

/// This module contains the QueryExecutor, which coordinates benchmark queries
/// against the database. It can “enrich” each BenchmarkQuery by sampling real
/// data from the relevant table. Each query’s execution is timed and recorded
/// via MetricsCollector, which is defined in the metrics module.
pub struct QueryExecutor {
pool: Pool<PostgresConnectionManager<NoTls>>,
queries: Vec<BenchmarkQuery>,
enriched_queries: Vec<EnrichedBenchmarkQuery>,
config: BenchmarkConfig,
metrics: MetricsCollector,
}

/// Represents strongly typed SQL values used in parametric queries.
/// Storing them as an enum allows us to handle different column types
/// transparently when performing random queries from the database.
/// This approach lets us build parameter lists matching each column's
/// actual type at runtime, ensuring correct and safe query execution.
///
/// We store each value in an `Option` to handle `NULL` values that can
/// appear in database columns.
#[derive(Clone, Debug)]
pub enum SqlValue {
Text(Option<String>),
Int4(Option<i32>),
Int8(Option<i64>),
Float8(Option<f64>),
Bool(Option<bool>),
Int2(Option<i16>),
Bytea(Option<Vec<u8>>),
}

#[derive(Debug, Clone)]
pub struct EnrichedBenchmarkQuery {
pub query: BenchmarkQuery,
pub rows: Vec<Vec<SqlValue>>,
pub types: Vec<Type>,
}

impl QueryExecutor {
pub async fn new(
db_url: &str,
queries: Vec<BenchmarkQuery>,
db_url: &Url,
enriched_queries: Vec<EnrichedBenchmarkQuery>,
config: BenchmarkConfig,
) -> Result<Self> {
let manager = PostgresConnectionManager::new_from_stringlike(db_url, NoTls)?;
let manager = PostgresConnectionManager::new_from_stringlike(db_url.as_str(), NoTls)?;
let pool = Pool::builder().build(manager).await?;

Ok(Self {
pool,
queries,
enriched_queries: Vec::new(),
enriched_queries,
config,
metrics: MetricsCollector::default(),
})
}

fn row_to_values(row: &Row) -> Vec<SqlValue> {
(0..row.len())
.map(|i| match row.columns()[i].type_() {
&Type::TEXT | &Type::VARCHAR => SqlValue::Text(row.get(i)),
&Type::INT4 => SqlValue::Int4(row.get(i)),
&Type::INT8 => SqlValue::Int8(row.get(i)),
&Type::FLOAT8 => SqlValue::Float8(row.get(i)),
&Type::BOOL => SqlValue::Bool(row.get(i)),
&Type::INT2 => SqlValue::Int2(row.get(i)),
&Type::BYTEA => SqlValue::Bytea(row.get(i)),
ty => panic!("Unsupported type: {:?}", ty),
})
.collect()
}

/// "Enriching" a query involves discovering valid column values for
/// placeholders. By sampling data from the table, we can produce
/// realistic sets of parameters, rather than random or empty
/// placeholders, leading to more accurate benchmark results.
async fn enrich_query(&self, query: &BenchmarkQuery) -> Result<EnrichedBenchmarkQuery> {
let client = self.pool.get().await?;
let sql = format!(
"SELECT DISTINCT {} FROM {} WHERE {} IS NOT NULL LIMIT 1000",
query.needed_columns.join(", "),
query.table_name,
query.needed_columns[0]
);

let rows = client.query(&sql, &[]).await?;
if rows.is_empty() {
info!(
"Warning: No sample data found for query on table {}, table is empty",
query.table_name
);
return Ok(EnrichedBenchmarkQuery {
query: query.clone(),
rows: Vec::new(),
types: query.needed_columns.iter().map(|_| Type::TEXT).collect(), // default type
});
}

let types = rows[0]
.columns()
.iter()
.map(|c| c.type_().clone())
.collect();
let raw_rows = rows.iter().map(Self::row_to_values).collect();

Ok(EnrichedBenchmarkQuery {
query: query.clone(),
rows: raw_rows,
types,
})
}

pub async fn initialize_samples(&mut self) -> Result<()> {
for query in &self.queries.clone() {
let enriched = self.enrich_query(query).await?;
self.enriched_queries.push(enriched);
}
Ok(())
}

async fn worker_task(
pool: Pool<PostgresConnectionManager<NoTls>>,
enriched_queries: Vec<EnrichedBenchmarkQuery>,
metrics: MetricsCollector,
deadline: Instant,
) -> Result<()> {
let client = pool.get().await?;
let mut query_rng = rand::thread_rng();
let mut row_rng = rand::thread_rng();
let mut query_rng = rand::rngs::StdRng::from_entropy();
let mut row_rng = rand::rngs::StdRng::from_entropy();
while Instant::now() < deadline {
let enriched = enriched_queries
.choose(&mut query_rng)
Expand Down Expand Up @@ -174,23 +84,19 @@ impl QueryExecutor {
let start = Instant::now();
let result = client.query(&query_str, &param_refs[..]).await;

metrics.record_query(&enriched.query.table_name, start.elapsed(), result.is_err());
metrics.record_query(enriched.query.clone(), start.elapsed(), result.is_err());
}
Ok(())
}

pub async fn run(&mut self) -> Result<BenchmarkResult> {
if self.enriched_queries.is_empty() {
self.initialize_samples().await?;
}

pub async fn run(&self) -> Result<BenchmarkResult> {
info!(
"Running benchmark with {} concurrent clients",
self.config.concurrency
);

let start = Instant::now();
let deadline = start + self.config.duration;
let deadline = start + self.config.timeout;
let (concurrency, metrics, pool, queries) = (
self.config.concurrency,
self.metrics.clone(),
Expand Down
Loading

0 comments on commit 592ae3d

Please sign in to comment.