Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Add InstalledScheduler for blockstore_processor
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Oct 26, 2023
1 parent 70107e2 commit 70e1654
Show file tree
Hide file tree
Showing 6 changed files with 315 additions and 41 deletions.
40 changes: 40 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ memmap2 = "0.5.10"
memoffset = "0.9"
merlin = "3"
min-max-heap = "1.3.0"
mockall = "0.11.4"
modular-bitfield = "0.11.2"
nix = "0.26.4"
num-bigint = "0.4.4"
Expand Down
165 changes: 131 additions & 34 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,55 @@ fn execute_batches_internal(
})
}

fn process_batches(
bank: &BankWithScheduler,
batches: &[TransactionBatchWithIndexes],
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
batch_execution_timing: &mut BatchExecutionTiming,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> Result<()> {
if !bank.has_installed_scheduler() {
debug!(
"process_batches()/rebatch_and_execute_batches({} batches)",
batches.len()
);
rebatch_and_execute_batches(
bank,
batches,
transaction_status_sender,
replay_vote_sender,
batch_execution_timing,
log_messages_bytes_limit,
prioritization_fee_cache,
)
} else {
debug!(
"process_batches()/schedule_batches_for_execution({} batches)",
batches.len()
);
schedule_batches_for_execution(bank, batches)
}
}

fn schedule_batches_for_execution(
bank: &BankWithScheduler,
batches: &[TransactionBatchWithIndexes],
) -> Result<()> {
for TransactionBatchWithIndexes {
batch,
transaction_indexes,
} in batches
{
bank.schedule_transaction_executions(
batch.sanitized_transactions(),
transaction_indexes.iter(),
);
}
Ok(())
}

fn rebatch_transactions<'a>(
lock_results: &'a [Result<()>],
bank: &'a Arc<Bank>,
Expand All @@ -314,7 +363,7 @@ fn rebatch_transactions<'a>(
}
}

fn execute_batches(
fn rebatch_and_execute_batches(
bank: &Arc<Bank>,
batches: &[TransactionBatchWithIndexes],
transaction_status_sender: Option<&TransactionStatusSender>,
Expand Down Expand Up @@ -488,7 +537,7 @@ fn process_entries(
if bank.is_block_boundary(bank.tick_height() + tick_hashes.len() as u64) {
// If it's a tick that will cause a new blockhash to be created,
// execute the group and register the tick
execute_batches(
process_batches(
bank,
&batches,
transaction_status_sender,
Expand Down Expand Up @@ -541,7 +590,7 @@ fn process_entries(
} else {
// else we have an entry that conflicts with a prior entry
// execute the current queue and try to process this entry again
execute_batches(
process_batches(
bank,
&batches,
transaction_status_sender,
Expand All @@ -556,7 +605,7 @@ fn process_entries(
}
}
}
execute_batches(
process_batches(
bank,
&batches,
transaction_status_sender,
Expand Down Expand Up @@ -1856,8 +1905,11 @@ pub mod tests {
rand::{thread_rng, Rng},
solana_entry::entry::{create_ticks, next_entry, next_entry_mut},
solana_program_runtime::declare_process_instruction,
solana_runtime::genesis_utils::{
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
solana_runtime::{
genesis_utils::{
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
},
installed_scheduler_pool::MockInstalledScheduler,
},
solana_sdk::{
account::{AccountSharedData, WritableAccount},
Expand Down Expand Up @@ -4245,6 +4297,38 @@ pub mod tests {
)
}

fn create_test_transactions(
mint_keypair: &Keypair,
genesis_hash: &Hash,
) -> Vec<SanitizedTransaction> {
let pubkey = solana_sdk::pubkey::new_rand();
let keypair2 = Keypair::new();
let pubkey2 = solana_sdk::pubkey::new_rand();
let keypair3 = Keypair::new();
let pubkey3 = solana_sdk::pubkey::new_rand();

vec![
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
mint_keypair,
&pubkey,
1,
*genesis_hash,
)),
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&keypair2,
&pubkey2,
1,
*genesis_hash,
)),
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&keypair3,
&pubkey3,
1,
*genesis_hash,
)),
]
}

#[test]
fn test_confirm_slot_entries_progress_num_txs_indexes() {
let GenesisConfigInfo {
Expand Down Expand Up @@ -4368,34 +4452,7 @@ pub mod tests {
..
} = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));

let pubkey = solana_sdk::pubkey::new_rand();
let keypair2 = Keypair::new();
let pubkey2 = solana_sdk::pubkey::new_rand();
let keypair3 = Keypair::new();
let pubkey3 = solana_sdk::pubkey::new_rand();

let txs = vec![
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_config.hash(),
)),
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&keypair2,
&pubkey2,
1,
genesis_config.hash(),
)),
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&keypair3,
&pubkey3,
1,
genesis_config.hash(),
)),
];

let txs = create_test_transactions(&mint_keypair, &genesis_config.hash());
let batch = bank.prepare_sanitized_batch(&txs);
assert!(batch.needs_unlock());
let transaction_indexes = vec![42, 43, 44];
Expand Down Expand Up @@ -4424,6 +4481,46 @@ pub mod tests {
assert_eq!(batch3.transaction_indexes, vec![43, 44]);
}

#[test]
fn test_schedule_batches_for_execution() {
solana_logger::setup();
let dummy_leader_pubkey = solana_sdk::pubkey::new_rand();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));

let txs = create_test_transactions(&mint_keypair, &genesis_config.hash());

let mut mocked_scheduler = MockInstalledScheduler::new();
mocked_scheduler
.expect_schedule_execution()
.times(txs.len())
.returning(|_| ());
let bank = BankWithScheduler::new(bank, Some(Box::new(mocked_scheduler)));

let batch = bank.prepare_sanitized_batch(&txs);
let batch_with_indexes = TransactionBatchWithIndexes {
batch,
transaction_indexes: (0..txs.len()).collect(),
};

let mut batch_execution_timing = BatchExecutionTiming::default();
let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64);
assert!(process_batches(
&bank,
&[batch_with_indexes],
None,
None,
&mut batch_execution_timing,
None,
&ignored_prioritization_fee_cache
)
.is_ok());
}

#[test]
fn test_confirm_slot_entries_with_fix() {
const HASHES_PER_TICK: u64 = 10;
Expand Down
Loading

0 comments on commit 70e1654

Please sign in to comment.