This repository has been archived by the owner on Jan 22, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4.7k
Add InstalledScheduler for blockstore_processor #33875
Merged
ryoqun
merged 5 commits into
solana-labs:master
from
ryoqun:minimal-installed-scheduler
Oct 27, 2023
Merged
Changes from 1 commit
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
70e1654
Add InstalledScheduler for blockstore_processor
ryoqun 9491e33
Reverse if clauses
ryoqun 2897280
Add more comments for process_batches()
ryoqun 6e63d87
Elaborate comment
ryoqun 5d856f9
Simplify schedule_transaction_executions type
ryoqun File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -294,6 +294,55 @@ fn execute_batches_internal( | |
}) | ||
} | ||
|
||
fn process_batches( | ||
bank: &BankWithScheduler, | ||
batches: &[TransactionBatchWithIndexes], | ||
transaction_status_sender: Option<&TransactionStatusSender>, | ||
replay_vote_sender: Option<&ReplayVoteSender>, | ||
batch_execution_timing: &mut BatchExecutionTiming, | ||
log_messages_bytes_limit: Option<usize>, | ||
prioritization_fee_cache: &PrioritizationFeeCache, | ||
) -> Result<()> { | ||
if !bank.has_installed_scheduler() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done: 9491e33 |
||
debug!( | ||
"process_batches()/rebatch_and_execute_batches({} batches)", | ||
batches.len() | ||
); | ||
rebatch_and_execute_batches( | ||
bank, | ||
batches, | ||
transaction_status_sender, | ||
replay_vote_sender, | ||
batch_execution_timing, | ||
log_messages_bytes_limit, | ||
prioritization_fee_cache, | ||
) | ||
} else { | ||
debug!( | ||
"process_batches()/schedule_batches_for_execution({} batches)", | ||
batches.len() | ||
); | ||
schedule_batches_for_execution(bank, batches) | ||
} | ||
} | ||
|
||
fn schedule_batches_for_execution( | ||
bank: &BankWithScheduler, | ||
batches: &[TransactionBatchWithIndexes], | ||
) -> Result<()> { | ||
for TransactionBatchWithIndexes { | ||
batch, | ||
transaction_indexes, | ||
} in batches | ||
{ | ||
bank.schedule_transaction_executions( | ||
batch.sanitized_transactions(), | ||
transaction_indexes.iter(), | ||
); | ||
} | ||
Ok(()) | ||
} | ||
|
||
fn rebatch_transactions<'a>( | ||
lock_results: &'a [Result<()>], | ||
bank: &'a Arc<Bank>, | ||
|
@@ -314,7 +363,7 @@ fn rebatch_transactions<'a>( | |
} | ||
} | ||
|
||
fn execute_batches( | ||
fn rebatch_and_execute_batches( | ||
bank: &Arc<Bank>, | ||
batches: &[TransactionBatchWithIndexes], | ||
transaction_status_sender: Option<&TransactionStatusSender>, | ||
|
@@ -488,7 +537,7 @@ fn process_entries( | |
if bank.is_block_boundary(bank.tick_height() + tick_hashes.len() as u64) { | ||
// If it's a tick that will cause a new blockhash to be created, | ||
// execute the group and register the tick | ||
execute_batches( | ||
process_batches( | ||
bank, | ||
&batches, | ||
transaction_status_sender, | ||
|
@@ -541,7 +590,7 @@ fn process_entries( | |
} else { | ||
// else we have an entry that conflicts with a prior entry | ||
// execute the current queue and try to process this entry again | ||
execute_batches( | ||
process_batches( | ||
bank, | ||
&batches, | ||
transaction_status_sender, | ||
|
@@ -556,7 +605,7 @@ fn process_entries( | |
} | ||
} | ||
} | ||
execute_batches( | ||
process_batches( | ||
bank, | ||
&batches, | ||
transaction_status_sender, | ||
|
@@ -1856,8 +1905,11 @@ pub mod tests { | |
rand::{thread_rng, Rng}, | ||
solana_entry::entry::{create_ticks, next_entry, next_entry_mut}, | ||
solana_program_runtime::declare_process_instruction, | ||
solana_runtime::genesis_utils::{ | ||
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs, | ||
solana_runtime::{ | ||
genesis_utils::{ | ||
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs, | ||
}, | ||
installed_scheduler_pool::MockInstalledScheduler, | ||
}, | ||
solana_sdk::{ | ||
account::{AccountSharedData, WritableAccount}, | ||
|
@@ -4245,6 +4297,38 @@ pub mod tests { | |
) | ||
} | ||
|
||
fn create_test_transactions( | ||
mint_keypair: &Keypair, | ||
genesis_hash: &Hash, | ||
) -> Vec<SanitizedTransaction> { | ||
let pubkey = solana_sdk::pubkey::new_rand(); | ||
let keypair2 = Keypair::new(); | ||
let pubkey2 = solana_sdk::pubkey::new_rand(); | ||
let keypair3 = Keypair::new(); | ||
let pubkey3 = solana_sdk::pubkey::new_rand(); | ||
|
||
vec![ | ||
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( | ||
mint_keypair, | ||
&pubkey, | ||
1, | ||
*genesis_hash, | ||
)), | ||
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( | ||
&keypair2, | ||
&pubkey2, | ||
1, | ||
*genesis_hash, | ||
)), | ||
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( | ||
&keypair3, | ||
&pubkey3, | ||
1, | ||
*genesis_hash, | ||
)), | ||
] | ||
} | ||
|
||
#[test] | ||
fn test_confirm_slot_entries_progress_num_txs_indexes() { | ||
let GenesisConfigInfo { | ||
|
@@ -4368,34 +4452,7 @@ pub mod tests { | |
.. | ||
} = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100); | ||
let bank = Arc::new(Bank::new_for_tests(&genesis_config)); | ||
|
||
let pubkey = solana_sdk::pubkey::new_rand(); | ||
let keypair2 = Keypair::new(); | ||
let pubkey2 = solana_sdk::pubkey::new_rand(); | ||
let keypair3 = Keypair::new(); | ||
let pubkey3 = solana_sdk::pubkey::new_rand(); | ||
|
||
let txs = vec![ | ||
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( | ||
&mint_keypair, | ||
&pubkey, | ||
1, | ||
genesis_config.hash(), | ||
)), | ||
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( | ||
&keypair2, | ||
&pubkey2, | ||
1, | ||
genesis_config.hash(), | ||
)), | ||
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( | ||
&keypair3, | ||
&pubkey3, | ||
1, | ||
genesis_config.hash(), | ||
)), | ||
]; | ||
|
||
let txs = create_test_transactions(&mint_keypair, &genesis_config.hash()); | ||
let batch = bank.prepare_sanitized_batch(&txs); | ||
assert!(batch.needs_unlock()); | ||
let transaction_indexes = vec![42, 43, 44]; | ||
|
@@ -4424,6 +4481,46 @@ pub mod tests { | |
assert_eq!(batch3.transaction_indexes, vec![43, 44]); | ||
} | ||
|
||
#[test] | ||
fn test_schedule_batches_for_execution() { | ||
solana_logger::setup(); | ||
let dummy_leader_pubkey = solana_sdk::pubkey::new_rand(); | ||
let GenesisConfigInfo { | ||
genesis_config, | ||
mint_keypair, | ||
.. | ||
} = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100); | ||
let bank = Arc::new(Bank::new_for_tests(&genesis_config)); | ||
|
||
let txs = create_test_transactions(&mint_keypair, &genesis_config.hash()); | ||
|
||
let mut mocked_scheduler = MockInstalledScheduler::new(); | ||
mocked_scheduler | ||
.expect_schedule_execution() | ||
.times(txs.len()) | ||
.returning(|_| ()); | ||
let bank = BankWithScheduler::new(bank, Some(Box::new(mocked_scheduler))); | ||
|
||
let batch = bank.prepare_sanitized_batch(&txs); | ||
let batch_with_indexes = TransactionBatchWithIndexes { | ||
batch, | ||
transaction_indexes: (0..txs.len()).collect(), | ||
}; | ||
|
||
let mut batch_execution_timing = BatchExecutionTiming::default(); | ||
let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64); | ||
assert!(process_batches( | ||
&bank, | ||
&[batch_with_indexes], | ||
None, | ||
None, | ||
&mut batch_execution_timing, | ||
None, | ||
&ignored_prioritization_fee_cache | ||
) | ||
.is_ok()); | ||
} | ||
|
||
#[test] | ||
fn test_confirm_slot_entries_with_fix() { | ||
const HASHES_PER_TICK: u64 = 10; | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this fn is a drop-in replacement for the previous
execute_batches
.In
process_entries
we still call this drop-in at the same times we calledexecute_batches
, i.e. tick-boundaries, entry conflicts, end of loop.Scheduling doesn't block on execution, and also cannot fail w/ an error - how does replay stage become aware of invalid block state without this fn returning it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with that in mind, i'm curious why the switch was introduced here instead of one layer up. Is there a benefit in collecting the until conflict/tick before scheduling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, thanks for well-deserved good questions! hope this in-source comments should answer to both of your questions: 2897280
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so? I think the main reason is we want to share most of the same code since there are checks on non-self-conflicting entries. And since scheduling doesn't block on execution, there shouldn't be too much overhead from just collecting the multiple entries instead of scheduling them as they come in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. that's correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fyi, i a bit improved the in-source comment: 6e63d87