Skip to content

Commit

Permalink
Support concurrent threads locking the same transaction (#19526)
Browse files Browse the repository at this point in the history
Fix crash when two threads concurrently lock the same transaction.

The new test case fails reliably if the fix is not present.
  • Loading branch information
mystenmark authored Sep 24, 2024
1 parent b39e43d commit 2de49a4
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 33 deletions.
62 changes: 29 additions & 33 deletions crates/sui-core/src/execution_cache/object_locks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ use sui_types::error::{SuiError, SuiResult, UserInputError};
use sui_types::object::Object;
use sui_types::storage::ObjectStore;
use sui_types::transaction::VerifiedSignedTransaction;
use tracing::{debug, info, instrument, trace};
use tracing::{debug, error, info, instrument, trace};

use super::writeback_cache::WritebackCache;

type RefCount = usize;

pub(super) struct ObjectLocks {
// When acquire transaction locks, lock entries are briefly inserted into this map. The map
// exists to provide atomic test-and-set operations on the locks. After all locks have been inserted
Expand All @@ -23,7 +25,7 @@ pub(super) struct ObjectLocks {
// those objects. Therefore we do a db read for each object we are locking.
//
// TODO: find a strategy to allow us to avoid db reads for each object.
locked_transactions: DashMap<ObjectRef, LockDetails>,
locked_transactions: DashMap<ObjectRef, (RefCount, LockDetails)>,
}

impl ObjectLocks {
Expand All @@ -38,29 +40,10 @@ impl ObjectLocks {
obj_ref: &ObjectRef,
epoch_store: &AuthorityPerEpochStore,
) -> SuiResult<Option<LockDetails>> {
match self.locked_transactions.entry(*obj_ref) {
DashMapEntry::Vacant(vacant) => {
let tables = epoch_store.tables()?;
let lock = tables.get_locked_transaction(obj_ref)?;
if let Some(lock_details) = lock {
vacant.insert(lock_details);
}
Ok(lock)
}
DashMapEntry::Occupied(occupied) => {
if cfg!(debug_assertions) {
if let Some(lock_details) = epoch_store
.tables()
.unwrap()
.get_locked_transaction(obj_ref)
.unwrap()
{
assert_eq!(*occupied.get(), lock_details);
}
}
Ok(Some(*occupied.get()))
}
}
// We don't consult the in-memory state here. We are only interested in state that
// has been committed to the db. This is because in memory state is reverted
// if the transaction is not successfully locked.
epoch_store.tables()?.get_locked_transaction(obj_ref)
}

/// Attempts to atomically test-and-set a transaction lock on an object.
Expand Down Expand Up @@ -96,15 +79,18 @@ impl ObjectLocks {
let tables = epoch_store.tables()?;
if let Some(lock_details) = tables.get_locked_transaction(obj_ref)? {
trace!("read lock from db: {:?}", lock_details);
vacant.insert(lock_details);
vacant.insert((1, lock_details));
lock_details
} else {
trace!("set lock: {:?}", new_lock);
vacant.insert(new_lock);
vacant.insert((1, new_lock));
new_lock
}
}
DashMapEntry::Occupied(occupied) => *occupied.get(),
DashMapEntry::Occupied(mut occupied) => {
occupied.get_mut().0 += 1;
occupied.get().1
}
};

if prev_lock != new_lock {
Expand Down Expand Up @@ -156,14 +142,24 @@ impl ObjectLocks {
fn clear_cached_locks(&self, locks: &[(ObjectRef, LockDetails)]) {
for (obj_ref, lock) in locks {
let entry = self.locked_transactions.entry(*obj_ref);
let occupied = match entry {
DashMapEntry::Vacant(_) => panic!("lock must exist"),
let mut occupied = match entry {
DashMapEntry::Vacant(_) => {
if cfg!(debug_assertions) {
panic!("lock must exist");
} else {
error!(?obj_ref, "lock should exist");
}
continue;
}
DashMapEntry::Occupied(occupied) => occupied,
};

if occupied.get() == lock {
trace!("clearing lock: {:?}", lock);
occupied.remove();
if occupied.get().1 == *lock {
occupied.get_mut().0 -= 1;
if occupied.get().0 == 0 {
trace!("clearing lock: {:?}", lock);
occupied.remove();
}
} else {
// this is impossible because the only case in which we overwrite a
// lock is when the lock is from a previous epoch. but we are holding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,79 @@ async fn test_concurrent_lockers() {
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_concurrent_lockers_same_tx() {
telemetry_subscribers::init_for_testing();

let mut s = Scenario::new(None, Arc::new(AtomicU32::new(0))).await;
let cache = s.cache.clone();
let mut txns = Vec::new();

for i in 0..1000 {
let a = i * 4;
let b = i * 4 + 1;
s.with_created(&[a, b]);
s.do_tx().await;

let a_ref = s.obj_ref(a);
let b_ref = s.obj_ref(b);

let tx1 = s.take_outputs();

let tx1 = s.make_signed_transaction(&tx1.transaction);

txns.push((tx1, a_ref, b_ref));
}

let barrier = Arc::new(tokio::sync::Barrier::new(2));

let t1 = {
let txns = txns.clone();
let cache = cache.clone();
let barrier = barrier.clone();
let epoch_store = s.epoch_store.clone();
tokio::task::spawn(async move {
let mut results = Vec::new();
for (tx1, a_ref, b_ref) in txns {
results.push(
cache
.acquire_transaction_locks(&epoch_store, &[a_ref, b_ref], tx1)
.await,
);
barrier.wait().await;
}
results
})
};

let t2 = {
let txns = txns.clone();
let cache = cache.clone();
let barrier = barrier.clone();
let epoch_store = s.epoch_store.clone();
tokio::task::spawn(async move {
let mut results = Vec::new();
for (tx1, a_ref, b_ref) in txns {
results.push(
cache
.acquire_transaction_locks(&epoch_store, &[a_ref, b_ref], tx1)
.await,
);
barrier.wait().await;
}
results
})
};

let results1 = t1.await.unwrap();
let results2 = t2.await.unwrap();

for (r1, r2) in results1.into_iter().zip(results2) {
assert!(r1.is_ok());
assert!(r2.is_ok());
}
}

#[tokio::test]
async fn latest_object_cache_race_test() {
let authority = TestAuthorityBuilder::new().build().await;
Expand Down

0 comments on commit 2de49a4

Please sign in to comment.