diff --git a/bin/autobahn-router/src/ix_builder.rs b/bin/autobahn-router/src/ix_builder.rs index 0579799..71841ba 100644 --- a/bin/autobahn-router/src/ix_builder.rs +++ b/bin/autobahn-router/src/ix_builder.rs @@ -265,7 +265,6 @@ mod tests { async fn initialize( _rpc: &mut RouterRpcClient, _options: HashMap, - _enable_compression: bool, ) -> anyhow::Result> where Self: Sized, diff --git a/bin/autobahn-router/src/main.rs b/bin/autobahn-router/src/main.rs index 7379748..203c5d9 100644 --- a/bin/autobahn-router/src/main.rs +++ b/bin/autobahn-router/src/main.rs @@ -235,16 +235,18 @@ async fn main() -> anyhow::Result<()> { ); cropper.insert("program_name".to_string(), "Cropper".to_string()); - let enable_compression = source_config.rpc_support_compression.unwrap_or_default(); + let gpa_compression_enabled = source_config.rpc_support_compression.unwrap_or_default(); let mut router_rpc = RouterRpcClient { rpc: Box::new(RouterRpcWrapper { rpc: build_rpc(&source_config), + gpa_compression_enabled, }), + gpa_compression_enabled, }; let dexs: Vec = [ dex::generic::build_dex!( - OrcaDex::initialize(&mut router_rpc, orca_config, enable_compression).await?, + OrcaDex::initialize(&mut router_rpc, orca_config).await?, &mango_data, config.orca.enabled, config.orca.add_mango_tokens, @@ -252,7 +254,7 @@ async fn main() -> anyhow::Result<()> { &config.orca.mints ), dex::generic::build_dex!( - OrcaDex::initialize(&mut router_rpc, cropper, enable_compression).await?, + OrcaDex::initialize(&mut router_rpc, cropper).await?, &mango_data, config.cropper.enabled, config.cropper.add_mango_tokens, @@ -260,8 +262,7 @@ async fn main() -> anyhow::Result<()> { &config.cropper.mints ), dex::generic::build_dex!( - dex_saber::SaberDex::initialize(&mut router_rpc, HashMap::new(), enable_compression) - .await?, + dex_saber::SaberDex::initialize(&mut router_rpc, HashMap::new()).await?, &mango_data, config.saber.enabled, config.saber.add_mango_tokens, @@ -269,12 +270,7 @@ async fn main() -> anyhow::Result<()> { &config.saber.mints ), dex::generic::build_dex!( - dex_raydium_cp::RaydiumCpDex::initialize( - &mut router_rpc, - HashMap::new(), - enable_compression - ) - .await?, + dex_raydium_cp::RaydiumCpDex::initialize(&mut router_rpc, HashMap::new(),).await?, &mango_data, config.raydium_cp.enabled, config.raydium_cp.add_mango_tokens, @@ -282,12 +278,7 @@ async fn main() -> anyhow::Result<()> { &config.raydium_cp.mints ), dex::generic::build_dex!( - dex_raydium::RaydiumDex::initialize( - &mut router_rpc, - HashMap::new(), - enable_compression - ) - .await?, + dex_raydium::RaydiumDex::initialize(&mut router_rpc, HashMap::new(),).await?, &mango_data, config.raydium.enabled, config.raydium.add_mango_tokens, @@ -295,12 +286,7 @@ async fn main() -> anyhow::Result<()> { &config.raydium.mints ), dex::generic::build_dex!( - dex_openbook_v2::OpenbookV2Dex::initialize( - &mut router_rpc, - HashMap::new(), - enable_compression - ) - .await?, + dex_openbook_v2::OpenbookV2Dex::initialize(&mut router_rpc, HashMap::new(),).await?, &mango_data, config.openbook_v2.enabled, config.openbook_v2.add_mango_tokens, @@ -308,12 +294,7 @@ async fn main() -> anyhow::Result<()> { &config.openbook_v2.mints ), dex::generic::build_dex!( - dex_infinity::InfinityDex::initialize( - &mut router_rpc, - HashMap::new(), - enable_compression - ) - .await?, + dex_infinity::InfinityDex::initialize(&mut router_rpc, HashMap::new(),).await?, &mango_data, config.infinity.enabled, false, @@ -321,12 +302,7 @@ async fn main() -> anyhow::Result<()> { &vec![] ), dex::generic::build_dex!( - dex_invariant::InvariantDex::initialize( - &mut router_rpc, - HashMap::new(), - enable_compression - ) - .await?, + dex_invariant::InvariantDex::initialize(&mut router_rpc, HashMap::new(),).await?, &mango_data, config.invariant.enabled, config.invariant.take_all_mints, diff --git a/bin/autobahn-router/src/mock.rs b/bin/autobahn-router/src/mock.rs index 432e233..c26aa46 100644 --- a/bin/autobahn-router/src/mock.rs +++ b/bin/autobahn-router/src/mock.rs @@ -56,7 +56,6 @@ pub mod test { async fn initialize( _rpc: &mut RouterRpcClient, _options: HashMap, - _enable_compression: bool, ) -> anyhow::Result> where Self: Sized, diff --git a/bin/autobahn-router/src/tests/dex_test_utils.rs b/bin/autobahn-router/src/tests/dex_test_utils.rs index 07151d0..cd90b4d 100644 --- a/bin/autobahn-router/src/tests/dex_test_utils.rs +++ b/bin/autobahn-router/src/tests/dex_test_utils.rs @@ -6,7 +6,6 @@ use std::sync::Arc; pub async fn get_all_dex( mut rpc_client: &mut RouterRpcClient, - enable_compression: bool, ) -> anyhow::Result>> { let orca_config = HashMap::from([ ( @@ -24,26 +23,14 @@ pub async fn get_all_dex( ]); let dexs = [ - dex_orca::OrcaDex::initialize(&mut rpc_client, orca_config, enable_compression).await?, - dex_orca::OrcaDex::initialize(&mut rpc_client, cropper_config, enable_compression).await?, - dex_saber::SaberDex::initialize(&mut rpc_client, HashMap::new(), enable_compression) - .await?, - dex_raydium_cp::RaydiumCpDex::initialize( - &mut rpc_client, - HashMap::new(), - enable_compression, - ) - .await?, - dex_raydium::RaydiumDex::initialize(&mut rpc_client, HashMap::new(), enable_compression) - .await?, - dex_openbook_v2::OpenbookV2Dex::initialize( - &mut rpc_client, - HashMap::new(), - enable_compression, - ) - .await?, - dex_infinity::InfinityDex::initialize(&mut rpc_client, HashMap::new(), enable_compression) - .await?, + dex_orca::OrcaDex::initialize(&mut rpc_client, orca_config).await?, + dex_orca::OrcaDex::initialize(&mut rpc_client, cropper_config).await?, + dex_saber::SaberDex::initialize(&mut rpc_client, HashMap::new()).await?, + dex_raydium_cp::RaydiumCpDex::initialize(&mut rpc_client, HashMap::new()).await?, + dex_raydium::RaydiumDex::initialize(&mut rpc_client, HashMap::new()).await?, + dex_openbook_v2::OpenbookV2Dex::initialize(&mut rpc_client, HashMap::new()).await?, + dex_infinity::InfinityDex::initialize(&mut rpc_client, HashMap::new()).await?, + dex_invariant::InvariantDex::initialize(&mut rpc_client, HashMap::new()).await?, ]; Ok(dexs.into_iter().collect()) diff --git a/bin/autobahn-router/src/tests/dump_all_dex.rs b/bin/autobahn-router/src/tests/dump_all_dex.rs index c0084a3..3d1d0ce 100644 --- a/bin/autobahn-router/src/tests/dump_all_dex.rs +++ b/bin/autobahn-router/src/tests/dump_all_dex.rs @@ -21,14 +21,15 @@ mod tests { router_feed_lib::utils::tracing_subscriber_init(); syscallstubs::deactivate_program_logs(); - let rpc_url = env::var("RPC_HTTP_URL")?; - let (mut rpc_client, _chain_data) = rpc::rpc_dumper_client(rpc_url, "all.lz4"); - let disable_compressed = std::env::var::("DISABLE_COMRPESSED_GPA".to_string()) .unwrap_or("false".to_string()); let disable_compressed: bool = disable_compressed.trim().parse().unwrap(); - let dexs = dex_test_utils::get_all_dex(&mut rpc_client, !disable_compressed).await?; + let rpc_url = env::var("RPC_HTTP_URL")?; + let (mut rpc_client, _chain_data) = + rpc::rpc_dumper_client(rpc_url, "all.lz4", !disable_compressed); + + let dexs = dex_test_utils::get_all_dex(&mut rpc_client).await?; for dex in &dexs { rpc::load_subscriptions(&mut rpc_client, dex.clone()).await?; diff --git a/bin/autobahn-router/src/tests/performance_tests.rs b/bin/autobahn-router/src/tests/performance_tests.rs index 9cfc411..946b188 100644 --- a/bin/autobahn-router/src/tests/performance_tests.rs +++ b/bin/autobahn-router/src/tests/performance_tests.rs @@ -36,13 +36,9 @@ mod tests { let _vsol = Pubkey::from_str("vSoLxydx6akxyMD9XEcPvGYNGq6Nn66oqVb3UkGkei7").unwrap(); let mnde = Pubkey::from_str("MNDEFzGvMt87ueuHvVU9VcTqsAP5b3fTGPsHuuPA5ey").unwrap(); - let disable_compressed = std::env::var::("DISABLE_COMRPESSED_GPA".to_string()) - .unwrap_or("false".to_string()); - let disable_compressed: bool = disable_compressed.trim().parse().unwrap(); - let (mut rpc_client, chain_data) = rpc::rpc_replayer_client("all.lz4"); let chain_data = Arc::new(ChainDataAccountProvider::new(chain_data)) as AccountProviderView; - let dex_sources = dex_test_utils::get_all_dex(&mut rpc_client, !disable_compressed).await?; + let dex_sources = dex_test_utils::get_all_dex(&mut rpc_client).await?; let mut dexs = vec![]; for dex in dex_sources { dexs.push( diff --git a/bin/autobahn-router/src/tests/warmup_performance_tests.rs b/bin/autobahn-router/src/tests/warmup_performance_tests.rs index 5063428..66f5045 100644 --- a/bin/autobahn-router/src/tests/warmup_performance_tests.rs +++ b/bin/autobahn-router/src/tests/warmup_performance_tests.rs @@ -24,13 +24,9 @@ mod tests { syscallstubs::deactivate_program_logs(); - let disable_compressed = std::env::var::("DISABLE_COMRPESSED_GPA".to_string()) - .unwrap_or("false".to_string()); - let disable_compressed: bool = disable_compressed.trim().parse().unwrap(); - let (mut rpc_client, chain_data) = rpc::rpc_replayer_client("all.lz4"); let chain_data = Arc::new(ChainDataAccountProvider::new(chain_data)) as AccountProviderView; - let dex_sources = dex_test_utils::get_all_dex(&mut rpc_client, !disable_compressed).await?; + let dex_sources = dex_test_utils::get_all_dex(&mut rpc_client).await?; let mut dexs = vec![]; for dex in dex_sources { dexs.push( diff --git a/lib/dex-infinity/src/infinity.rs b/lib/dex-infinity/src/infinity.rs index 70d3b99..9f4012c 100644 --- a/lib/dex-infinity/src/infinity.rs +++ b/lib/dex-infinity/src/infinity.rs @@ -36,7 +36,6 @@ impl DexInterface for InfinityDex { async fn initialize( rpc: &mut RouterRpcClient, _options: HashMap, - _enable_compression: bool, ) -> anyhow::Result> where Self: Sized, diff --git a/lib/dex-infinity/tests/test_infinity.rs b/lib/dex-infinity/tests/test_infinity.rs index 56cbf30..9bb40f9 100644 --- a/lib/dex-infinity/tests/test_infinity.rs +++ b/lib/dex-infinity/tests/test_infinity.rs @@ -16,30 +16,29 @@ async fn test_dump_input_data_infinity() -> anyhow::Result<()> { step_1_infinity(!disable_compressed).await?; } - step_2_infinity(!disable_compressed).await?; + step_2_infinity().await?; Ok(()) } async fn step_1_infinity(enable_compression: bool) -> anyhow::Result<()> { let rpc_url = env::var("RPC_HTTP_URL")?; - let (mut rpc_client, chain_data) = rpc::rpc_dumper_client(rpc_url, "infinity_dump.lz4"); + let (mut rpc_client, chain_data) = + rpc::rpc_dumper_client(rpc_url, "infinity_dump.lz4", enable_compression); let options = HashMap::from([]); - let dex = - dex_infinity::InfinityDex::initialize(&mut rpc_client, options, enable_compression).await?; + let dex = dex_infinity::InfinityDex::initialize(&mut rpc_client, options).await?; generate_dex_rpc_dump::run_dump_mainnet_data(dex, rpc_client, chain_data).await?; Ok(()) } -async fn step_2_infinity(enable_compression: bool) -> anyhow::Result<()> { +async fn step_2_infinity() -> anyhow::Result<()> { let (mut rpc_client, chain_data) = rpc::rpc_replayer_client("infinity_dump.lz4"); let options = HashMap::from([]); - let dex = - dex_infinity::InfinityDex::initialize(&mut rpc_client, options, enable_compression).await?; + let dex = dex_infinity::InfinityDex::initialize(&mut rpc_client, options).await?; generate_dex_rpc_dump::run_dump_swap_ix("infinity_swap.lz4", dex, chain_data).await?; diff --git a/lib/dex-invariant/src/invariant_dex.rs b/lib/dex-invariant/src/invariant_dex.rs index 8641b54..d904965 100644 --- a/lib/dex-invariant/src/invariant_dex.rs +++ b/lib/dex-invariant/src/invariant_dex.rs @@ -231,12 +231,11 @@ impl DexInterface for InvariantDex { async fn initialize( rpc: &mut RouterRpcClient, _options: HashMap, - enable_compression: bool, ) -> anyhow::Result> where Self: Sized, { - let mut pools = fetch_invariant_accounts(rpc, crate::id(), enable_compression).await?; + let mut pools = fetch_invariant_accounts(rpc, crate::id()).await?; let reserves = pools .iter() @@ -461,7 +460,6 @@ impl DexInterface for InvariantDex { async fn fetch_invariant_accounts( rpc: &mut RouterRpcClient, program_id: Pubkey, - compression_enabled: bool, ) -> anyhow::Result> { let config = RpcProgramAccountsConfig { filters: Some(vec![RpcFilterType::DataSize(Pool::LEN as u64)]), @@ -473,7 +471,7 @@ async fn fetch_invariant_accounts( }; let snapshot = rpc - .get_program_accounts_with_config(&program_id, config, compression_enabled) + .get_program_accounts_with_config(&program_id, config) .await?; let result = snapshot diff --git a/lib/dex-invariant/tests/test_invariant.rs b/lib/dex-invariant/tests/test_invariant.rs index a0d20f7..e329870 100644 --- a/lib/dex-invariant/tests/test_invariant.rs +++ b/lib/dex-invariant/tests/test_invariant.rs @@ -16,7 +16,7 @@ async fn test_dump_input_data_invariant() -> anyhow::Result<()> { invariant_step_1(&options, !disable_compressed).await?; } - invariant_step_2(&options, !disable_compressed).await?; + invariant_step_2(&options).await?; Ok(()) } @@ -27,31 +27,19 @@ async fn invariant_step_1( ) -> anyhow::Result<()> { let rpc_url = env::var("RPC_HTTP_URL")?; - let (mut rpc_client, chain_data) = rpc::rpc_dumper_client(rpc_url, "invariant_swap.lz4"); - let dex = dex_invariant::InvariantDex::initialize( - &mut rpc_client, - options.clone(), - enable_compression, - ) - .await?; + let (mut rpc_client, chain_data) = + rpc::rpc_dumper_client(rpc_url, "invariant_swap.lz4", enable_compression); + let dex = dex_invariant::InvariantDex::initialize(&mut rpc_client, options.clone()).await?; generate_dex_rpc_dump::run_dump_mainnet_data(dex, rpc_client, chain_data).await?; Ok(()) } -async fn invariant_step_2( - options: &HashMap, - enable_compression: bool, -) -> anyhow::Result<()> { +async fn invariant_step_2(options: &HashMap) -> anyhow::Result<()> { let (mut rpc_client, chain_data) = rpc::rpc_replayer_client("invariant_swap.lz4"); - let dex = dex_invariant::InvariantDex::initialize( - &mut rpc_client, - options.clone(), - enable_compression, - ) - .await?; + let dex = dex_invariant::InvariantDex::initialize(&mut rpc_client, options.clone()).await?; generate_dex_rpc_dump::run_dump_swap_ix("invariant_swap.lz4", dex, chain_data).await?; diff --git a/lib/dex-openbook-v2/src/openbook_v2_dex.rs b/lib/dex-openbook-v2/src/openbook_v2_dex.rs index 37e96da..82062ca 100644 --- a/lib/dex-openbook-v2/src/openbook_v2_dex.rs +++ b/lib/dex-openbook-v2/src/openbook_v2_dex.rs @@ -32,12 +32,11 @@ impl DexInterface for OpenbookV2Dex { async fn initialize( rpc: &mut RouterRpcClient, _options: HashMap, - enable_compression: bool, ) -> anyhow::Result> where Self: Sized, { - let markets = fetch_openbook_v2_account(rpc, openbook_v2::id(), enable_compression) + let markets = fetch_openbook_v2_account(rpc, openbook_v2::id()) .await? .into_iter() .filter(|x| x.1.open_orders_admin.is_none()) @@ -405,7 +404,6 @@ impl DexInterface for OpenbookV2Dex { async fn fetch_openbook_v2_account( rpc: &mut RouterRpcClient, program_id: Pubkey, - enable_compression: bool, ) -> anyhow::Result> { let config = RpcProgramAccountsConfig { filters: Some(vec![ @@ -424,7 +422,7 @@ async fn fetch_openbook_v2_account( }; let snapshot = rpc - .get_program_accounts_with_config(&program_id, config, enable_compression) // todo use compression here + .get_program_accounts_with_config(&program_id, config) // todo use compression here .await?; let result = snapshot diff --git a/lib/dex-openbook-v2/tests/test_openbook_v2_cp.rs b/lib/dex-openbook-v2/tests/test_openbook_v2_cp.rs index ec14f4a..f7de8e8 100644 --- a/lib/dex-openbook-v2/tests/test_openbook_v2_cp.rs +++ b/lib/dex-openbook-v2/tests/test_openbook_v2_cp.rs @@ -14,10 +14,10 @@ async fn test_dump_input_data_openbook_v2() -> anyhow::Result<()> { let disable_compressed: bool = disable_compressed.trim().parse().unwrap(); if router_test_lib::config_should_dump_mainnet_data() { - openbook_v2_step_1(&options, disable_compressed).await?; + openbook_v2_step_1(&options, !disable_compressed).await?; } - openbook_v2_step_2(&options, disable_compressed).await?; + openbook_v2_step_2(&options).await?; Ok(()) } @@ -28,14 +28,10 @@ async fn openbook_v2_step_1( ) -> anyhow::Result<()> { let rpc_url: String = env::var("RPC_HTTP_URL")?; - let (mut rpc_client, chain_data) = rpc::rpc_dumper_client(rpc_url, "openbook_v2_dump.lz4"); + let (mut rpc_client, chain_data) = + rpc::rpc_dumper_client(rpc_url, "openbook_v2_dump.lz4", enable_compression); - let dex = dex_openbook_v2::OpenbookV2Dex::initialize( - &mut rpc_client, - options.clone(), - enable_compression, - ) - .await?; + let dex = dex_openbook_v2::OpenbookV2Dex::initialize(&mut rpc_client, options.clone()).await?; generate_dex_rpc_dump::run_dump_mainnet_data_with_custom_amount( dex, @@ -51,19 +47,11 @@ async fn openbook_v2_step_1( Ok(()) } -async fn openbook_v2_step_2( - options: &HashMap, - enable_compression: bool, -) -> anyhow::Result<()> { +async fn openbook_v2_step_2(options: &HashMap) -> anyhow::Result<()> { // Replay let (mut rpc_client, chain_data) = rpc::rpc_replayer_client("openbook_v2_dump.lz4"); - let dex = dex_openbook_v2::OpenbookV2Dex::initialize( - &mut rpc_client, - options.clone(), - enable_compression, - ) - .await?; + let dex = dex_openbook_v2::OpenbookV2Dex::initialize(&mut rpc_client, options.clone()).await?; generate_dex_rpc_dump::run_dump_swap_ix_with_custom_amount( "openbook_v2_swap.lz4", diff --git a/lib/dex-orca/src/orca.rs b/lib/dex-orca/src/orca.rs index dce8ffe..c415644 100644 --- a/lib/dex-orca/src/orca.rs +++ b/lib/dex-orca/src/orca.rs @@ -289,7 +289,6 @@ pub fn simulate_swap_with_tick_array( pub async fn fetch_all_whirlpools( rpc: &mut RouterRpcClient, program_id: &Pubkey, - enable_compression: bool, ) -> anyhow::Result> { let config = RpcProgramAccountsConfig { filters: Some(vec![ @@ -307,7 +306,7 @@ pub async fn fetch_all_whirlpools( ..Default::default() }; let whirlpools = rpc - .get_program_accounts_with_config(program_id, config, enable_compression) + .get_program_accounts_with_config(program_id, config) .await?; let result = whirlpools .iter() diff --git a/lib/dex-orca/src/orca_dex.rs b/lib/dex-orca/src/orca_dex.rs index 5feaad8..fd7faa9 100644 --- a/lib/dex-orca/src/orca_dex.rs +++ b/lib/dex-orca/src/orca_dex.rs @@ -53,7 +53,6 @@ impl DexInterface for OrcaDex { async fn initialize( rpc: &mut RouterRpcClient, options: HashMap, - enable_compression: bool, ) -> anyhow::Result> where Self: Sized, @@ -65,13 +64,7 @@ impl DexInterface for OrcaDex { }; result.edges.extend( - Self::load_edge_identifiers( - rpc, - &result.program_name, - &result.program_id, - enable_compression, - ) - .await?, + Self::load_edge_identifiers(rpc, &result.program_name, &result.program_id).await?, ); Ok(Arc::new(result)) @@ -223,9 +216,8 @@ impl OrcaDex { rpc: &mut RouterRpcClient, program_name: &str, program_id: &Pubkey, - enable_compression: bool, ) -> anyhow::Result>>> { - let whirlpools = fetch_all_whirlpools(rpc, program_id, enable_compression).await?; + let whirlpools = fetch_all_whirlpools(rpc, program_id).await?; let vaults = whirlpools .iter() diff --git a/lib/dex-orca/tests/test_cropper.rs b/lib/dex-orca/tests/test_cropper.rs index acd71be..9274f95 100644 --- a/lib/dex-orca/tests/test_cropper.rs +++ b/lib/dex-orca/tests/test_cropper.rs @@ -35,7 +35,7 @@ async fn test_dump_input_data_cropper() -> anyhow::Result<()> { cropper_step_1(&options, !disable_compressed).await?; } - cropper_step_2(&options, !disable_compressed).await?; + cropper_step_2(&options).await?; Ok(()) } @@ -46,25 +46,21 @@ async fn cropper_step_1( ) -> anyhow::Result<()> { let rpc_url = env::var("RPC_HTTP_URL")?; - let (mut rpc_client, chain_data) = rpc::rpc_dumper_client(rpc_url, "cropper_dump.lz4"); + let (mut rpc_client, chain_data) = + rpc::rpc_dumper_client(rpc_url, "cropper_dump.lz4", enable_compression); - let dex = - dex_orca::OrcaDex::initialize(&mut rpc_client, options.clone(), enable_compression).await?; + let dex = dex_orca::OrcaDex::initialize(&mut rpc_client, options.clone()).await?; generate_dex_rpc_dump::run_dump_mainnet_data(dex, rpc_client, chain_data).await?; Ok(()) } -async fn cropper_step_2( - options: &HashMap, - enable_compression: bool, -) -> anyhow::Result<()> { +async fn cropper_step_2(options: &HashMap) -> anyhow::Result<()> { // Replay let (mut rpc_client, chain_data) = rpc::rpc_replayer_client("cropper_dump.lz4"); - let dex = - dex_orca::OrcaDex::initialize(&mut rpc_client, options.clone(), enable_compression).await?; + let dex = dex_orca::OrcaDex::initialize(&mut rpc_client, options.clone()).await?; generate_dex_rpc_dump::run_dump_swap_ix("cropper_swap.lz4", dex, chain_data).await?; diff --git a/lib/dex-orca/tests/test_orca.rs b/lib/dex-orca/tests/test_orca.rs index 307e645..fa35f7d 100644 --- a/lib/dex-orca/tests/test_orca.rs +++ b/lib/dex-orca/tests/test_orca.rs @@ -21,7 +21,7 @@ async fn test_dump_input_data_orca() -> anyhow::Result<()> { orca_step_1(&options, !disable_compressed).await?; } - orca_step_2(&options, !disable_compressed).await?; + orca_step_2(&options).await?; Ok(()) } @@ -31,24 +31,20 @@ async fn orca_step_1( enable_compression: bool, ) -> anyhow::Result<()> { let rpc_url = env::var("RPC_HTTP_URL")?; - let (mut rpc_client, chain_data) = rpc::rpc_dumper_client(rpc_url, "orca_dump.lz4"); + let (mut rpc_client, chain_data) = + rpc::rpc_dumper_client(rpc_url, "orca_dump.lz4", enable_compression); - let dex = - dex_orca::OrcaDex::initialize(&mut rpc_client, options.clone(), enable_compression).await?; + let dex = dex_orca::OrcaDex::initialize(&mut rpc_client, options.clone()).await?; generate_dex_rpc_dump::run_dump_mainnet_data(dex, rpc_client, chain_data).await?; Ok(()) } -async fn orca_step_2( - options: &HashMap, - enable_compression: bool, -) -> anyhow::Result<()> { +async fn orca_step_2(options: &HashMap) -> anyhow::Result<()> { let (mut rpc_client, chain_data) = rpc::rpc_replayer_client("orca_dump.lz4"); - let dex = - dex_orca::OrcaDex::initialize(&mut rpc_client, options.clone(), enable_compression).await?; + let dex = dex_orca::OrcaDex::initialize(&mut rpc_client, options.clone()).await?; generate_dex_rpc_dump::run_dump_swap_ix("orca_swap.lz4", dex, chain_data).await?; diff --git a/lib/dex-raydium-cp/src/raydium_cp.rs b/lib/dex-raydium-cp/src/raydium_cp.rs index c5178fe..0ef2454 100644 --- a/lib/dex-raydium-cp/src/raydium_cp.rs +++ b/lib/dex-raydium-cp/src/raydium_cp.rs @@ -39,18 +39,12 @@ impl DexInterface for RaydiumCpDex { async fn initialize( rpc: &mut RouterRpcClient, _options: HashMap, - enable_compression: bool, ) -> anyhow::Result> where Self: Sized, { - let pools = fetch_raydium_account::( - rpc, - RaydiumCpSwap::id(), - PoolState::LEN, - enable_compression, - ) - .await?; + let pools = + fetch_raydium_account::(rpc, RaydiumCpSwap::id(), PoolState::LEN).await?; let vaults = pools .iter() @@ -377,7 +371,6 @@ async fn fetch_raydium_account( rpc: &mut RouterRpcClient, program_id: Pubkey, len: usize, - enable_compression: bool, ) -> anyhow::Result> { let config = RpcProgramAccountsConfig { filters: Some(vec![ @@ -393,7 +386,7 @@ async fn fetch_raydium_account( }; let snapshot = rpc - .get_program_accounts_with_config(&program_id, config, enable_compression) + .get_program_accounts_with_config(&program_id, config) .await?; let result = snapshot diff --git a/lib/dex-raydium-cp/tests/test_raydium_cp.rs b/lib/dex-raydium-cp/tests/test_raydium_cp.rs index 7d789fc..aefa0b1 100644 --- a/lib/dex-raydium-cp/tests/test_raydium_cp.rs +++ b/lib/dex-raydium-cp/tests/test_raydium_cp.rs @@ -18,7 +18,7 @@ async fn test_dump_input_data_raydium_cp() -> anyhow::Result<()> { raydium_cp_step_1(&options, !disable_compressed).await?; } - raydium_cp_step_2(&options, !disable_compressed).await?; + raydium_cp_step_2(&options).await?; Ok(()) } @@ -29,33 +29,21 @@ async fn raydium_cp_step_1( ) -> anyhow::Result<()> { let rpc_url = env::var("RPC_HTTP_URL")?; - let (mut rpc_client, chain_data) = rpc::rpc_dumper_client(rpc_url, "raydium_cp_dump.lz4"); + let (mut rpc_client, chain_data) = + rpc::rpc_dumper_client(rpc_url, "raydium_cp_dump.lz4", enable_compression); - let dex = dex_raydium_cp::RaydiumCpDex::initialize( - &mut rpc_client, - options.clone(), - enable_compression, - ) - .await?; + let dex = dex_raydium_cp::RaydiumCpDex::initialize(&mut rpc_client, options.clone()).await?; generate_dex_rpc_dump::run_dump_mainnet_data(dex, rpc_client, chain_data).await?; Ok(()) } -async fn raydium_cp_step_2( - options: &HashMap, - enable_compression: bool, -) -> anyhow::Result<()> { +async fn raydium_cp_step_2(options: &HashMap) -> anyhow::Result<()> { // Replay let (mut rpc_client, chain_data) = rpc::rpc_replayer_client("raydium_cp_dump.lz4"); - let dex = dex_raydium_cp::RaydiumCpDex::initialize( - &mut rpc_client, - options.clone(), - enable_compression, - ) - .await?; + let dex = dex_raydium_cp::RaydiumCpDex::initialize(&mut rpc_client, options.clone()).await?; generate_dex_rpc_dump::run_dump_swap_ix("raydium_cp_swap.lz4", dex, chain_data).await?; diff --git a/lib/dex-raydium/src/raydium_dex.rs b/lib/dex-raydium/src/raydium_dex.rs index 5a05ccd..cbae476 100644 --- a/lib/dex-raydium/src/raydium_dex.rs +++ b/lib/dex-raydium/src/raydium_dex.rs @@ -34,12 +34,11 @@ impl DexInterface for RaydiumDex { async fn initialize( rpc: &mut RouterRpcClient, _options: HashMap, - enable_compression: bool, ) -> anyhow::Result> where Self: Sized, { - let pools = fetch_raydium_accounts(rpc, crate::id(), enable_compression).await?; + let pools = fetch_raydium_accounts(rpc, crate::id()).await?; info!("Number of raydium AMM: {:?}", pools.len()); @@ -285,7 +284,6 @@ impl DexInterface for RaydiumDex { async fn fetch_raydium_accounts( rpc: &mut RouterRpcClient, program_id: Pubkey, - enable_compression: bool, ) -> anyhow::Result> { let config = RpcProgramAccountsConfig { filters: Some(vec![RpcFilterType::DataSize( @@ -300,7 +298,7 @@ async fn fetch_raydium_accounts( }; let snapshot = rpc - .get_program_accounts_with_config(&program_id, config, enable_compression) + .get_program_accounts_with_config(&program_id, config) .await?; let result = snapshot diff --git a/lib/dex-raydium/tests/test_raydium.rs b/lib/dex-raydium/tests/test_raydium.rs index 140e98b..b99c240 100644 --- a/lib/dex-raydium/tests/test_raydium.rs +++ b/lib/dex-raydium/tests/test_raydium.rs @@ -17,7 +17,7 @@ async fn test_dump_input_data_raydium() -> anyhow::Result<()> { raydium_step_1(&options, !disable_compressed).await?; } - raydium_step_2(&options, !disable_compressed).await?; + raydium_step_2(&options).await?; Ok(()) } @@ -27,26 +27,20 @@ async fn raydium_step_1( enable_compression: bool, ) -> anyhow::Result<()> { let rpc_url = env::var("RPC_HTTP_URL")?; - let (mut rpc_client, chain_data) = rpc::rpc_dumper_client(rpc_url, "raydium_dump.lz4"); + let (mut rpc_client, chain_data) = + rpc::rpc_dumper_client(rpc_url, "raydium_dump.lz4", enable_compression); - let dex = - dex_raydium::RaydiumDex::initialize(&mut rpc_client, options.clone(), enable_compression) - .await?; + let dex = dex_raydium::RaydiumDex::initialize(&mut rpc_client, options.clone()).await?; generate_dex_rpc_dump::run_dump_mainnet_data(dex, rpc_client, chain_data).await?; Ok(()) } -async fn raydium_step_2( - options: &HashMap, - enable_compression: bool, -) -> anyhow::Result<()> { +async fn raydium_step_2(options: &HashMap) -> anyhow::Result<()> { let (mut rpc_client, chain_data) = rpc::rpc_replayer_client("raydium_dump.lz4"); - let dex = - dex_raydium::RaydiumDex::initialize(&mut rpc_client, options.clone(), enable_compression) - .await?; + let dex = dex_raydium::RaydiumDex::initialize(&mut rpc_client, options.clone()).await?; generate_dex_rpc_dump::run_dump_swap_ix("raydium_swap.lz4", dex, chain_data).await?; diff --git a/lib/dex-saber/src/saber.rs b/lib/dex-saber/src/saber.rs index 61d25ea..9f61995 100644 --- a/lib/dex-saber/src/saber.rs +++ b/lib/dex-saber/src/saber.rs @@ -34,18 +34,12 @@ impl DexInterface for SaberDex { async fn initialize( rpc: &mut RouterRpcClient, _options: HashMap, - enable_compression: bool, ) -> anyhow::Result> where Self: Sized, { - let pools = fetch_saber_account::( - rpc, - stable_swap_client::id(), - SwapInfo::LEN, - enable_compression, - ) - .await?; + let pools = + fetch_saber_account::(rpc, stable_swap_client::id(), SwapInfo::LEN).await?; let edge_pairs = pools .iter() @@ -219,7 +213,6 @@ async fn fetch_saber_account( rpc: &mut RouterRpcClient, program_id: Pubkey, len: usize, - enable_compression: bool, ) -> anyhow::Result> { let config = RpcProgramAccountsConfig { filters: Some(vec![RpcFilterType::DataSize(len as u64)]), @@ -232,7 +225,7 @@ async fn fetch_saber_account( }; let snapshot = rpc - .get_program_accounts_with_config(&program_id, config, enable_compression) + .get_program_accounts_with_config(&program_id, config) .await?; let result = snapshot diff --git a/lib/dex-saber/tests/test_saber.rs b/lib/dex-saber/tests/test_saber.rs index 1cc700e..9f41b38 100644 --- a/lib/dex-saber/tests/test_saber.rs +++ b/lib/dex-saber/tests/test_saber.rs @@ -18,7 +18,7 @@ async fn test_dump_input_data_saber() -> anyhow::Result<()> { saber_step_1(&options, !disable_compressed).await?; } - saber_step_2(&options, !disable_compressed).await?; + saber_step_2(&options).await?; Ok(()) } @@ -28,24 +28,20 @@ async fn saber_step_1( enable_compression: bool, ) -> anyhow::Result<()> { let rpc_url = env::var("RPC_HTTP_URL")?; - let (mut rpc_client, chain_data) = rpc::rpc_dumper_client(rpc_url, "saber_dump.lz4"); + let (mut rpc_client, chain_data) = + rpc::rpc_dumper_client(rpc_url, "saber_dump.lz4", enable_compression); - let dex = dex_saber::SaberDex::initialize(&mut rpc_client, options.clone(), enable_compression) - .await?; + let dex = dex_saber::SaberDex::initialize(&mut rpc_client, options.clone()).await?; generate_dex_rpc_dump::run_dump_mainnet_data(dex, rpc_client, chain_data).await?; Ok(()) } -async fn saber_step_2( - options: &HashMap, - enable_compression: bool, -) -> anyhow::Result<()> { +async fn saber_step_2(options: &HashMap) -> anyhow::Result<()> { let (mut rpc_client, chain_data) = rpc::rpc_replayer_client("saber_dump.lz4"); - let dex = dex_saber::SaberDex::initialize(&mut rpc_client, options.clone(), enable_compression) - .await?; + let dex = dex_saber::SaberDex::initialize(&mut rpc_client, options.clone()).await?; generate_dex_rpc_dump::run_dump_swap_ix("saber_swap.lz4", dex, chain_data).await?; diff --git a/lib/router-feed-lib/src/grpc_tx_watcher.rs b/lib/router-feed-lib/src/grpc_tx_watcher.rs index 7747cfd..114e48f 100644 --- a/lib/router-feed-lib/src/grpc_tx_watcher.rs +++ b/lib/router-feed-lib/src/grpc_tx_watcher.rs @@ -14,7 +14,7 @@ use yellowstone_grpc_proto::tonic::{ use async_channel::Sender; use solana_sdk::signature::Signature; use std::time::Instant; -use std::{collections::HashMap, env, str::FromStr, time::Duration}; +use std::{collections::HashMap, env, time::Duration}; use tracing::*; use yellowstone_grpc_proto::prelude::{ diff --git a/lib/router-feed-lib/src/router_rpc_client.rs b/lib/router-feed-lib/src/router_rpc_client.rs index 0b0bd5a..93e87e3 100644 --- a/lib/router-feed-lib/src/router_rpc_client.rs +++ b/lib/router-feed-lib/src/router_rpc_client.rs @@ -18,12 +18,14 @@ pub trait RouterRpcClientTrait: Sync + Send { &mut self, pubkey: &Pubkey, config: RpcProgramAccountsConfig, - compression_enabled: bool, ) -> anyhow::Result>; + + fn is_gpa_compression_enabled(&self) -> bool; } pub struct RouterRpcClient { pub rpc: Box, + pub gpa_compression_enabled: bool, } #[async_trait::async_trait] @@ -43,10 +45,13 @@ impl RouterRpcClientTrait for RouterRpcClient { &mut self, pubkey: &Pubkey, config: RpcProgramAccountsConfig, - compression_enabled: bool, ) -> anyhow::Result> { self.rpc - .get_program_accounts_with_config(pubkey, config, compression_enabled) + .get_program_accounts_with_config(pubkey, config) .await } + + fn is_gpa_compression_enabled(&self) -> bool { + self.gpa_compression_enabled + } } diff --git a/lib/router-feed-lib/src/router_rpc_wrapper.rs b/lib/router-feed-lib/src/router_rpc_wrapper.rs index be8308e..7d95160 100644 --- a/lib/router-feed-lib/src/router_rpc_wrapper.rs +++ b/lib/router-feed-lib/src/router_rpc_wrapper.rs @@ -17,6 +17,7 @@ use crate::router_rpc_client::RouterRpcClientTrait; pub struct RouterRpcWrapper { pub rpc: RpcClient, + pub gpa_compression_enabled: bool, } #[async_trait] @@ -54,20 +55,23 @@ impl RouterRpcClientTrait for RouterRpcWrapper { &mut self, pubkey: &Pubkey, config: RpcProgramAccountsConfig, - compression_enabled: bool, ) -> anyhow::Result> { - if !compression_enabled { + if self.is_gpa_compression_enabled() { Ok( - get_uncompressed_program_account_rpc(&self.rpc, &HashSet::from([*pubkey]), config) + get_compressed_program_account_rpc(&self.rpc, &HashSet::from([*pubkey]), config) .await? .1, ) } else { Ok( - get_compressed_program_account_rpc(&self.rpc, &HashSet::from([*pubkey]), config) + get_uncompressed_program_account_rpc(&self.rpc, &HashSet::from([*pubkey]), config) .await? .1, ) } } + + fn is_gpa_compression_enabled(&self) -> bool { + self.gpa_compression_enabled + } } diff --git a/lib/router-lib/src/dex/interface.rs b/lib/router-lib/src/dex/interface.rs index 045e6f0..d79a560 100644 --- a/lib/router-lib/src/dex/interface.rs +++ b/lib/router-lib/src/dex/interface.rs @@ -167,7 +167,6 @@ pub trait DexInterface: Sync + Send { async fn initialize( rpc: &mut RouterRpcClient, options: HashMap, - enable_compression: bool, ) -> anyhow::Result> where Self: Sized; diff --git a/lib/router-lib/src/test_tools/rpc.rs b/lib/router-lib/src/test_tools/rpc.rs index e87750a..3503cf8 100644 --- a/lib/router-lib/src/test_tools/rpc.rs +++ b/lib/router-lib/src/test_tools/rpc.rs @@ -67,7 +67,6 @@ impl RouterRpcClientTrait for ReplayerRpcClient { &mut self, pubkey: &Pubkey, config: RpcProgramAccountsConfig, - _compression_enabled: bool, ) -> anyhow::Result> { let config_serialized = serde_json::to_string(&config)?; match self @@ -80,6 +79,10 @@ impl RouterRpcClientTrait for ReplayerRpcClient { None => anyhow::bail!("Invalid gpa"), } } + + fn is_gpa_compression_enabled(&self) -> bool { + false + } } #[async_trait::async_trait] @@ -122,12 +125,11 @@ impl RouterRpcClientTrait for DumpRpcClient { &mut self, pubkey: &Pubkey, config: RpcProgramAccountsConfig, - compression_enabled: bool, ) -> anyhow::Result> { let config_serialized = serde_json::to_string(&config)?; match self .rpc - .get_program_accounts_with_config(pubkey, config.clone(), compression_enabled) + .get_program_accounts_with_config(pubkey, config.clone()) .await { Ok(r) => { @@ -152,6 +154,10 @@ impl RouterRpcClientTrait for DumpRpcClient { } } } + + fn is_gpa_compression_enabled(&self) -> bool { + false + } } impl Drop for DumpRpcClient { @@ -160,7 +166,11 @@ impl Drop for DumpRpcClient { } } -pub fn rpc_dumper_client(url: String, out_path: &str) -> (RouterRpcClient, ChainDataArcRw) { +pub fn rpc_dumper_client( + url: String, + out_path: &str, + gpa_compression_enabled: bool, +) -> (RouterRpcClient, ChainDataArcRw) { let chain_data = ChainDataArcRw::new(RwLock::new(ChainData::new())); let rpc_client = RouterRpcClient { rpc: Box::new(DumpRpcClient { @@ -176,10 +186,13 @@ pub fn rpc_dumper_client(url: String, out_path: &str) -> (RouterRpcClient, Chain Duration::from_secs(60 * 20), CommitmentConfig::finalized(), ), + gpa_compression_enabled, }), + gpa_compression_enabled, }, path: out_path.to_string(), }), + gpa_compression_enabled, }; (rpc_client, chain_data) @@ -215,7 +228,10 @@ pub fn rpc_replayer_client(in_path: &str) -> (RouterRpcClient, ChainDataArcRw) { } let rpc = ReplayerRpcClient { dump }; - let replayer = RouterRpcClient { rpc: Box::new(rpc) }; + let replayer = RouterRpcClient { + rpc: Box::new(rpc), + gpa_compression_enabled: false, + }; let chain_data = ChainDataArcRw::new(RwLock::new(chain_data)); (replayer, chain_data) @@ -281,7 +297,6 @@ pub async fn load_subscriptions( account_config: Default::default(), with_context: Some(true), }, - false, ) .await?; } @@ -298,7 +313,6 @@ pub async fn load_subscriptions( account_config: Default::default(), with_context: Some(true), }, - false, ) .await?; } @@ -319,7 +333,6 @@ pub async fn load_subscriptions( account_config: Default::default(), with_context: Some(true), }, - false, ) .await?; } diff --git a/scripts/smoke-test.sh b/scripts/smoke-test.sh index 71a1c7e..e7b8608 100755 --- a/scripts/smoke-test.sh +++ b/scripts/smoke-test.sh @@ -5,7 +5,7 @@ function pad () { [ "$#" -gt 1 ] && [ -n "$2" ] && printf "%$2.${2#-}s" "$1"; } # env settings export DUMP_MAINNET_DATA=1 RUST_LOG=info - +export RPC_HTTP_URL="http://fcs-da1._peer.internal:18899" # define in addition # RPC_HTTP_URL="http://fcs-ams1._peer.internal:18899" # for eclipse