From 9f4a90e656e982c1dc75e7ed982474230ddc5b49 Mon Sep 17 00:00:00 2001 From: Dmitri Makarov Date: Thu, 25 Jul 2024 16:55:55 -0400 Subject: [PATCH] Refactor data structure representing account candidates for cleaning AccountsDB::clean_accounts makes unnecessary copies of large number of pubkeys and accompanying information to find and operate on the accounts that can be deleted from the accounts index. With this change the candidates for deletion are organized in a single data structure with necessary information being updated in-place, thus reducing memory requirements of the cleaning procedure. --- accounts-db/src/accounts_db.rs | 381 +++++++++++++++++++-------------- 1 file changed, 220 insertions(+), 161 deletions(-) diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index 053782133ee6a2..c0988f7614a49a 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -105,7 +105,7 @@ use { path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering}, - Arc, Condvar, Mutex, + Arc, Condvar, Mutex, RwLock, }, thread::{sleep, Builder}, time::{Duration, Instant}, @@ -1343,6 +1343,19 @@ impl StoreAccountsTiming { } } +#[derive(Default, Debug)] +struct CleaningInfo { + pub slot_list: SlotList, + pub ref_count: u64, +} + +impl CleaningInfo { + pub fn update(&mut self, slot_list: SlotList, ref_count: u64) { + self.slot_list = slot_list; + self.ref_count = ref_count; + } +} + /// Removing unrooted slots in Accounts Background Service needs to be synchronized with flushing /// slots from the Accounts Cache. This keeps track of those slots and the Mutex + Condvar for /// synchronization. @@ -2742,7 +2755,7 @@ impl AccountsDb { /// 1. one of the pubkeys in the store has account info to a store whose store count is not going to zero /// 2. a pubkey we were planning to remove is not removing all stores that contain the account fn calc_delete_dependencies( - purges: &HashMap, RefCount)>, + purges: &Vec>>, store_counts: &mut HashMap)>, min_slot: Option, ) { @@ -2750,78 +2763,87 @@ impl AccountsDb { // do not match the criteria of deleting all appendvecs which contain them // then increment their storage count. let mut already_counted = IntSet::default(); - for (pubkey, (account_infos, ref_count_from_storage)) in purges.iter() { - let mut failed_slot = None; - let all_stores_being_deleted = - account_infos.len() as RefCount == *ref_count_from_storage; - if all_stores_being_deleted { - let mut delete = true; - for (slot, _account_info) in account_infos { - if let Some(count) = store_counts.get(slot).map(|s| s.0) { - debug!( - "calc_delete_dependencies() + for bin in purges { + let bin = bin.read().unwrap(); + for ( + pubkey, + CleaningInfo { + slot_list, + ref_count, + }, + ) in bin.iter() + { + let mut failed_slot = None; + let all_stores_being_deleted = slot_list.len() as RefCount == *ref_count; + if all_stores_being_deleted { + let mut delete = true; + for (slot, _account_info) in slot_list { + if let Some(count) = store_counts.get(slot).map(|s| s.0) { + debug!( + "calc_delete_dependencies() slot: {slot}, count len: {count}" - ); - if count == 0 { - // this store CAN be removed - continue; + ); + if count == 0 { + // this store CAN be removed + continue; + } } + // One of the pubkeys in the store has account info to a store whose store count is not going to zero. + // If the store cannot be found, that also means store isn't being deleted. + failed_slot = Some(*slot); + delete = false; + break; } - // One of the pubkeys in the store has account info to a store whose store count is not going to zero. - // If the store cannot be found, that also means store isn't being deleted. - failed_slot = Some(*slot); - delete = false; - break; - } - if delete { - // this pubkey can be deleted from all stores it is in - continue; - } - } else { - // a pubkey we were planning to remove is not removing all stores that contain the account - debug!( - "calc_delete_dependencies(), + if delete { + // this pubkey can be deleted from all stores it is in + continue; + } + } else { + // a pubkey we were planning to remove is not removing all stores that contain the account + debug!( + "calc_delete_dependencies(), pubkey: {}, account_infos: {:?}, account_infos_len: {}, ref_count_from_storage: {}", - pubkey, - account_infos, - account_infos.len(), - ref_count_from_storage, - ); - } - - // increment store_counts to non-zero for all stores that can not be deleted. - let mut pending_stores = IntSet::default(); - for (slot, _account_info) in account_infos { - if !already_counted.contains(slot) { - pending_stores.insert(*slot); + pubkey, + slot_list, + slot_list.len(), + ref_count, + ); } - } - while !pending_stores.is_empty() { - let slot = pending_stores.iter().next().cloned().unwrap(); - if Some(slot) == min_slot { - if let Some(failed_slot) = failed_slot.take() { - info!("calc_delete_dependencies, oldest slot is not able to be deleted because of {pubkey} in slot {failed_slot}"); - } else { - info!("calc_delete_dependencies, oldest slot is not able to be deleted because of {pubkey}, account infos len: {}, ref count: {ref_count_from_storage}", account_infos.len()); + + // increment store_counts to non-zero for all stores that can not be deleted. + let mut pending_stores = IntSet::default(); + for (slot, _account_info) in slot_list { + if !already_counted.contains(slot) { + pending_stores.insert(*slot); } } + while !pending_stores.is_empty() { + let slot = pending_stores.iter().next().cloned().unwrap(); + if Some(slot) == min_slot { + if let Some(failed_slot) = failed_slot.take() { + info!("calc_delete_dependencies, oldest slot is not able to be deleted because of {pubkey} in slot {failed_slot}"); + } else { + info!("calc_delete_dependencies, oldest slot is not able to be deleted because of {pubkey}, account infos len: {}, ref count: {ref_count}", slot_list.len()); + } + } - pending_stores.remove(&slot); - if !already_counted.insert(slot) { - continue; - } - // the point of all this code: remove the store count for all stores we cannot remove - if let Some(store_count) = store_counts.remove(&slot) { - // all pubkeys in this store also cannot be removed from all stores they are in - let affected_pubkeys = &store_count.1; - for key in affected_pubkeys { - for (slot, _account_info) in &purges.get(key).unwrap().0 { - if !already_counted.contains(slot) { - pending_stores.insert(*slot); + pending_stores.remove(&slot); + if !already_counted.insert(slot) { + continue; + } + // the point of all this code: remove the store count for all stores we cannot remove + if let Some(store_count) = store_counts.remove(&slot) { + // all pubkeys in this store also cannot be removed from all stores they are in + let affected_pubkeys = &store_count.1; + for key in affected_pubkeys { + for (slot, _account_info) in &bin.get(key).unwrap().slot_list { + if !already_counted.contains(slot) { + pending_stores.insert(*slot); + } } } } @@ -2995,7 +3017,7 @@ impl AccountsDb { last_full_snapshot_slot: Option, timings: &mut CleanKeyTimings, epoch_schedule: &EpochSchedule, - ) -> (Vec, Option) { + ) -> (Vec>>, Option) { let oldest_non_ancient_slot = self.get_oldest_non_ancient_slot(epoch_schedule); let mut dirty_store_processing_time = Measure::start("dirty_store_processing"); let max_slot_inclusive = @@ -3014,7 +3036,10 @@ impl AccountsDb { } }); let dirty_stores_len = dirty_stores.len(); - let pubkeys = DashSet::new(); + let num_bins = self.accounts_index.bins(); + let pubkeys: Vec>> = (0..num_bins) + .map(|_| RwLock::new(HashMap::new())) + .collect::>(); let dirty_ancient_stores = AtomicUsize::default(); let mut dirty_store_routine = || { let chunk_size = 1.max(dirty_stores_len.saturating_div(rayon::current_num_threads())); @@ -3027,8 +3052,10 @@ impl AccountsDb { dirty_ancient_stores.fetch_add(1, Ordering::Relaxed); } oldest_dirty_slot = oldest_dirty_slot.min(*slot); - store.accounts.scan_pubkeys(|k| { - pubkeys.insert(*k); + store.accounts.scan_pubkeys(|key| { + let index = self.accounts_index.bin_calculator.bin_from_pubkey(key); + let mut pubkeys_bin = pubkeys[index].write().unwrap(); + pubkeys_bin.insert(*key, CleaningInfo::default()); }); }); oldest_dirty_slot @@ -3051,9 +3078,15 @@ impl AccountsDb { trace!( "dirty_stores.len: {} pubkeys.len: {}", dirty_stores_len, - pubkeys.len() + pubkeys + .iter() + .map(|x| x.read().unwrap().len()) + .sum::(), ); - timings.dirty_pubkeys_count = pubkeys.len() as u64; + timings.dirty_pubkeys_count = pubkeys + .iter() + .map(|x| x.read().unwrap().len()) + .sum::() as u64; dirty_store_processing_time.stop(); timings.dirty_store_processing_us += dirty_store_processing_time.as_us(); timings.dirty_ancient_stores = dirty_ancient_stores.load(Ordering::Relaxed); @@ -3068,17 +3101,21 @@ impl AccountsDb { self.thread_pool_clean.install(|| { delta_keys.par_iter().for_each(|keys| { for key in keys { - pubkeys.insert(*key); + let index = self.accounts_index.bin_calculator.bin_from_pubkey(key); + let mut pubkeys_bin = pubkeys[index].write().unwrap(); + pubkeys_bin.insert(*key, CleaningInfo::default()); } }); }); delta_insert.stop(); timings.delta_insert_us += delta_insert.as_us(); - timings.delta_key_count = pubkeys.len() as u64; + timings.delta_key_count = pubkeys + .iter() + .map(|x| x.read().unwrap().len()) + .sum::() as u64; let mut hashset_to_vec = Measure::start("flat_map"); - let mut pubkeys: Vec = pubkeys.into_iter().collect(); hashset_to_vec.stop(); timings.hashset_to_vec_us += hashset_to_vec.as_us(); @@ -3094,7 +3131,9 @@ impl AccountsDb { let is_candidate_for_clean = max_slot_inclusive >= *slot && last_full_snapshot_slot >= *slot; if is_candidate_for_clean { - pubkeys.push(*pubkey); + let index = self.accounts_index.bin_calculator.bin_from_pubkey(pubkey); + let mut pubkeys_bin = pubkeys[index].write().unwrap(); + pubkeys_bin.insert(*pubkey, CleaningInfo::default()); } !is_candidate_for_clean }); @@ -3204,7 +3243,7 @@ impl AccountsDb { self.report_store_stats(); let mut key_timings = CleanKeyTimings::default(); - let (mut candidates, min_dirty_slot) = self.construct_candidate_clean_keys( + let (candidates, min_dirty_slot) = self.construct_candidate_clean_keys( max_clean_root_inclusive, is_startup, last_full_snapshot_slot, @@ -3212,16 +3251,10 @@ impl AccountsDb { epoch_schedule, ); - let mut sort = Measure::start("sort"); - if is_startup { - candidates.par_sort_unstable(); - } else { - self.thread_pool_clean - .install(|| candidates.par_sort_unstable()); - } - sort.stop(); - - let num_candidates = candidates.len(); + let num_candidates = candidates + .iter() + .map(|x| x.read().unwrap().len()) + .sum::(); let mut accounts_scan = Measure::start("accounts_scan"); let uncleaned_roots = self.accounts_index.clone_uncleaned_roots(); let found_not_zero_accum = AtomicU64::new(0); @@ -3230,19 +3263,26 @@ impl AccountsDb { let useful_accum = AtomicU64::new(0); // parallel scan the index. - let (mut purges_zero_lamports, purges_old_accounts) = { + let purges_old_accounts = { let do_clean_scan = || { candidates - .par_chunks(4096) - .map(|candidates: &[Pubkey]| { - let mut purges_zero_lamports = HashMap::new(); + .par_iter() + .map(|bin| { let mut purges_old_accounts = Vec::new(); let mut found_not_zero = 0; let mut not_found_on_fork = 0; let mut missing = 0; let mut useful = 0; + // Must separate keys for passing as an argument to accounts_index.scan(), + // otherwise two simultaneous, conflicting borrows occur + // 1. immutable borrow of bin to iterate over the keys, and + // 2. mutable borrow of bin to update bin values in the callback closure. + // An alternative is to wrap the bin values in RefCell for non-exclusive + // access. + let keys: Vec = bin.read().unwrap().keys().copied().collect(); + let mut bin = bin.write().unwrap(); self.accounts_index.scan( - candidates.iter(), + keys.iter(), |candidate, slot_list_and_ref_count, _entry| { let mut useless = true; if let Some((slot_list, ref_count)) = slot_list_and_ref_count { @@ -3260,19 +3300,18 @@ impl AccountsDb { &slot_list[index_in_slot_list]; if account_info.is_zero_lamport() { useless = false; - // the latest one is zero lamports. we may be able to purge it. - // so, add to purges_zero_lamports - purges_zero_lamports.insert( - *candidate, - ( - // add all the rooted entries that contain this pubkey. we know the highest rooted entry is zero lamports + // The latest one is zero lamports. We may be able to purge it. + if let Some(val) = bin.get_mut(candidate) { + val.update( + // Add all the rooted entries that contain this pubkey. + // We know the highest rooted entry is zero lamports. self.accounts_index.get_rooted_entries( slot_list, max_clean_root_inclusive, ), ref_count, - ), - ); + ); + } } else { found_not_zero += 1; } @@ -3315,17 +3354,13 @@ impl AccountsDb { not_found_on_fork_accum.fetch_add(not_found_on_fork, Ordering::Relaxed); missing_accum.fetch_add(missing, Ordering::Relaxed); useful_accum.fetch_add(useful, Ordering::Relaxed); - (purges_zero_lamports, purges_old_accounts) + purges_old_accounts + }) + .reduce(Vec::new, |mut a, b| { + // Collapse down the vecs into one. + a.extend(b); + a }) - .reduce( - || (HashMap::new(), Vec::new()), - |mut a, b| { - // Collapse down the hashmaps/vecs into one. - a.0.extend(b.0); - a.1.extend(b.1); - a - }, - ) }; if is_startup { do_clean_scan() @@ -3352,11 +3387,22 @@ impl AccountsDb { // Calculate store counts as if everything was purged // Then purge if we can let mut store_counts: HashMap)> = HashMap::new(); - for (pubkey, (slot_list, ref_count)) in purges_zero_lamports.iter_mut() { - if purged_account_slots.contains_key(pubkey) { - *ref_count = self.accounts_index.ref_count_from_storage(pubkey); - } - slot_list.retain(|(slot, account_info)| { + for bin in candidates + .iter() + .filter(|bin| !bin.read().unwrap().is_empty()) + { + for ( + pubkey, + CleaningInfo { + slot_list, + ref_count, + }, + ) in bin.write().unwrap().iter_mut() + { + if purged_account_slots.contains_key(pubkey) { + *ref_count = self.accounts_index.ref_count_from_storage(pubkey); + } + slot_list.retain(|(slot, account_info)| { let was_slot_purged = purged_account_slots .get(pubkey) .map(|slots_removed| slots_removed.contains(slot)) @@ -3401,11 +3447,12 @@ impl AccountsDb { } true }); + } } store_counts_time.stop(); let mut calc_deps_time = Measure::start("calc_deps"); - Self::calc_delete_dependencies(&purges_zero_lamports, &mut store_counts, min_dirty_slot); + Self::calc_delete_dependencies(&candidates, &mut store_counts, min_dirty_slot); calc_deps_time.stop(); let mut purge_filter = Measure::start("purge_filter"); @@ -3413,24 +3460,31 @@ impl AccountsDb { max_clean_root_inclusive, last_full_snapshot_slot, &store_counts, - &mut purges_zero_lamports, + &candidates, ); purge_filter.stop(); let mut reclaims_time = Measure::start("reclaims"); // Recalculate reclaims with new purge set - let pubkey_to_slot_set: Vec<_> = purges_zero_lamports - .into_iter() - .map(|(key, (slots_list, _ref_count))| { - ( - key, - slots_list - .into_iter() - .map(|(slot, _)| slot) - .collect::>(), - ) - }) - .collect(); + let mut pubkey_to_slot_set: Vec<(Pubkey, HashSet)> = Vec::new(); + for bin in candidates { + let bin = bin.read().unwrap(); + if !bin.is_empty() { + for (pubkey, cleaning_info) in bin.iter() { + let CleaningInfo { + slot_list, + ref_count: _, + } = cleaning_info; + pubkey_to_slot_set.push(( + *pubkey, + slot_list + .iter() + .map(|(slot, _)| *slot) + .collect::>(), + )); + } + } + } let (reclaims, pubkeys_removed_from_accounts_index2) = self.purge_keys_exact(pubkey_to_slot_set.iter()); @@ -3484,7 +3538,6 @@ impl AccountsDb { ("delta_insert_us", key_timings.delta_insert_us, i64), ("delta_key_count", key_timings.delta_key_count, i64), ("dirty_pubkeys_count", key_timings.dirty_pubkeys_count, i64), - ("sort_us", sort.as_us(), i64), ("useful_keys", useful_accum.load(Ordering::Relaxed), i64), ("total_keys_count", num_candidates, i64), ( @@ -3665,7 +3718,7 @@ impl AccountsDb { max_clean_root_inclusive: Option, last_full_snapshot_slot: Option, store_counts: &HashMap)>, - purges_zero_lamports: &mut HashMap, RefCount)>, + purges_zero_lamports: &Vec>>, ) { let should_filter_for_incremental_snapshots = max_clean_root_inclusive.unwrap_or(Slot::MAX) > last_full_snapshot_slot.unwrap_or(Slot::MAX); @@ -3674,45 +3727,51 @@ impl AccountsDb { "if filtering for incremental snapshots, then snapshots should be enabled", ); - purges_zero_lamports.retain(|pubkey, (slot_account_infos, _ref_count)| { - // Only keep purges_zero_lamports where the entire history of the account in the root set - // can be purged. All AppendVecs for those updates are dead. - for (slot, _account_info) in slot_account_infos.iter() { - if let Some(store_count) = store_counts.get(slot) { - if store_count.0 != 0 { - // one store this pubkey is in is not being removed, so this pubkey cannot be removed at all + for bin in purges_zero_lamports { + let mut bin = bin.write().unwrap(); + bin.retain(|pubkey, cleaning_info| { + let CleaningInfo { + slot_list, + ref_count: _, + } = cleaning_info; + // Only keep purges_zero_lamports where the entire history of the account in the root set + // can be purged. All AppendVecs for those updates are dead. + for (slot, _account_info) in slot_list.iter() { + if let Some(store_count) = store_counts.get(slot) { + if store_count.0 != 0 { + // one store this pubkey is in is not being removed, so this pubkey cannot be removed at all + return false; + } + } else { + // store is not being removed, so this pubkey cannot be removed at all return false; } - } else { - // store is not being removed, so this pubkey cannot be removed at all - return false; } - } - // Exit early if not filtering more for incremental snapshots - if !should_filter_for_incremental_snapshots { - return true; - } - - let slot_account_info_at_highest_slot = slot_account_infos - .iter() - .max_by_key(|(slot, _account_info)| slot); - - slot_account_info_at_highest_slot.map_or(true, |(slot, account_info)| { - // Do *not* purge zero-lamport accounts if the slot is greater than the last full - // snapshot slot. Since we're `retain`ing the accounts-to-purge, I felt creating - // the `cannot_purge` variable made this easier to understand. Accounts that do - // not get purged here are added to a list so they be considered for purging later - // (i.e. after the next full snapshot). - assert!(account_info.is_zero_lamport()); - let cannot_purge = *slot > last_full_snapshot_slot.unwrap(); - if cannot_purge { - self.zero_lamport_accounts_to_purge_after_full_snapshot - .insert((*slot, *pubkey)); + // Exit early if not filtering more for incremental snapshots + if !should_filter_for_incremental_snapshots { + return true; } - !cannot_purge - }) - }); + + let slot_account_info_at_highest_slot = + slot_list.iter().max_by_key(|(slot, _account_info)| slot); + + slot_account_info_at_highest_slot.map_or(true, |(slot, account_info)| { + // Do *not* purge zero-lamport accounts if the slot is greater than the last full + // snapshot slot. Since we're `retain`ing the accounts-to-purge, I felt creating + // the `cannot_purge` variable made this easier to understand. Accounts that do + // not get purged here are added to a list so they be considered for purging later + // (i.e. after the next full snapshot). + assert!(account_info.is_zero_lamport()); + let cannot_purge = *slot > last_full_snapshot_slot.unwrap(); + if cannot_purge { + self.zero_lamport_accounts_to_purge_after_full_snapshot + .insert((*slot, *pubkey)); + } + !cannot_purge + }) + }); + } } // Must be kept private!, does sensitive cleanup that should only be called from