Skip to content

Commit a91cd03

Browse files
committed
rollout epoch based divergence
1 parent 9c31b46 commit a91cd03

File tree

6 files changed

+8
-58
lines changed

6 files changed

+8
-58
lines changed

cloud/replication_test.cc

+3-27
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,7 @@ class ReplicationTest : public testing::Test {
217217
// it will catch up until end of log
218218
//
219219
// Returns the number of log records applied
220-
size_t catchUpFollower(std::optional<size_t> num_records = std::nullopt,
221-
bool allow_new_manifest_writes = true);
220+
size_t catchUpFollower(std::optional<size_t> num_records = std::nullopt);
222221

223222
WriteOptions wo() const {
224223
WriteOptions w;
@@ -429,11 +428,7 @@ DB* ReplicationTest::openLeader(Options options, uint64_t snapshot_replication_e
429428
s = db->ApplyReplicationLogRecord(
430429
log_records_[leaderSeq].first, log_records_[leaderSeq].second,
431430
[this](Slice) { return ColumnFamilyOptions(leaderOptions()); },
432-
true /* allow_new_manifest_writes */,
433-
snapshot_replication_epoch_,
434-
&info,
435-
DB::AR_EVICT_OBSOLETE_FILES |
436-
DB::AR_EPOCH_BASED_DIVERGENCE_DETECTION);
431+
snapshot_replication_epoch_, &info, DB::AR_EVICT_OBSOLETE_FILES);
437432
assert(s.ok());
438433
assert(!info.diverged_manifest_writes);
439434
}
@@ -480,8 +475,7 @@ DB* ReplicationTest::openFollower(Options options) {
480475
return db;
481476
}
482477

483-
size_t ReplicationTest::catchUpFollower(std::optional<size_t> num_records,
484-
bool allow_new_manifest_writes) {
478+
size_t ReplicationTest::catchUpFollower(std::optional<size_t> num_records) {
485479
MutexLock lock(&log_records_mutex_);
486480
DB::ApplyReplicationLogRecordInfo info;
487481
size_t ret = 0;
@@ -496,15 +490,11 @@ size_t ReplicationTest::catchUpFollower(std::optional<size_t> num_records,
496490
[this](Slice) {
497491
return ColumnFamilyOptions(follower_db_->GetOptions());
498492
},
499-
allow_new_manifest_writes,
500493
snapshot_replication_epoch_,
501494
&info, flags);
502495
assert(s.ok());
503496
++ret;
504497
}
505-
if (info.has_new_manifest_writes) {
506-
assert(info.has_manifest_writes);
507-
}
508498
for (auto& cf : info.added_column_families) {
509499
auto inserted =
510500
follower_cfs_.try_emplace(cf->GetName(), std::move(cf)).second;
@@ -1112,20 +1102,6 @@ TEST_F(ReplicationTest, LogNumberDontGoBackwards) {
11121102
EXPECT_GE(logNum, minLogNumberToKeep);
11131103
}
11141104

1115-
TEST_F(ReplicationTest, AllowNewManifestWrite) {
1116-
auto leader = openLeader(), follower = openFollower();
1117-
1118-
ASSERT_OK(leader->Put(wo(), "k1", "v1"));
1119-
ASSERT_OK(leader->Flush({}));
1120-
catchUpFollower(2);
1121-
// The new manifest write won't be applied
1122-
catchUpFollower(1, false);
1123-
uint64_t followerMUS, leaderMUS;
1124-
ASSERT_OK(follower->GetManifestUpdateSequence(&followerMUS));
1125-
ASSERT_OK(leader->GetManifestUpdateSequence(&leaderMUS));
1126-
EXPECT_LT(followerMUS, leaderMUS);
1127-
}
1128-
11291105
// Memtable switch record won't be generated if all memtables are empty
11301106
TEST_F(ReplicationTest, NoMemSwitchRecordIfEmpty) {
11311107
auto leader = openLeader();

db/db_impl/db_impl.cc

+4-13
Original file line numberDiff line numberDiff line change
@@ -1321,15 +1321,12 @@ std::string DescribeVersionEdit(const VersionEdit& e, ColumnFamilyData* cfd) {
13211321
Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
13221322
std::string replication_sequence,
13231323
CFOptionsFactory cf_options_factory,
1324-
bool allow_new_manifest_writes,
13251324
uint64_t snapshot_replication_epoch,
13261325
ApplyReplicationLogRecordInfo* info,
13271326
unsigned flags) {
13281327
JobContext job_context(0, false);
13291328
Status s;
13301329
bool evictObsoleteFiles = flags & AR_EVICT_OBSOLETE_FILES;
1331-
bool enableEpochBasedDivergenceDetection =
1332-
flags & AR_EPOCH_BASED_DIVERGENCE_DETECTION;
13331330

13341331
{
13351332
WriteThread::Writer w;
@@ -1469,8 +1466,7 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
14691466
// at epoch `e` is generated while there is no manifest writes for the
14701467
// latest epoch yet.
14711468
if (latest_applied_update_sequence <= current_update_sequence) {
1472-
if (enableEpochBasedDivergenceDetection &&
1473-
!versions_->IsReplicationEpochsEmpty()) {
1469+
if (!versions_->IsReplicationEpochsEmpty()) {
14741470
auto inferred_epoch_of_mus = versions_->GetReplicationEpochForMUS(
14751471
latest_applied_update_sequence);
14761472
// If mus is smaller than mus in the epoch set, the replication
@@ -1506,9 +1502,9 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
15061502
// don't apply the manifest write if it's already applied
15071503
continue;
15081504
} else {
1509-
if (enableEpochBasedDivergenceDetection &&
1510-
!versions_->IsReplicationEpochsEmpty() &&
1511-
versions_->replication_epochs_.GetLargestEpoch() < snapshot_replication_epoch) {
1505+
if (!versions_->IsReplicationEpochsEmpty() &&
1506+
versions_->replication_epochs_.GetLargestEpoch() <
1507+
snapshot_replication_epoch) {
15121508
// This should be the first mus of `snapshotEpoch`
15131509
if (replication_epoch != snapshot_replication_epoch) {
15141510
info->diverged_manifest_writes = true;
@@ -1523,11 +1519,6 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
15231519
}
15241520
}
15251521

1526-
info->has_new_manifest_writes = true;
1527-
if (!allow_new_manifest_writes) {
1528-
// We don't expect new manifest writes, break early
1529-
break;
1530-
}
15311522
++current_update_sequence;
15321523
if (e.GetManifestUpdateSequence() != current_update_sequence) {
15331524
std::ostringstream oss;

db/db_impl/db_impl.h

-1
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,6 @@ class DBImpl : public DB {
427427
Status ApplyReplicationLogRecord(ReplicationLogRecord record,
428428
std::string replication_sequence,
429429
CFOptionsFactory cf_options_factory,
430-
bool allow_new_manifest_writes,
431430
uint64_t snapshot_replication_epoch,
432431
ApplyReplicationLogRecordInfo* info,
433432
unsigned flags) override;

db/db_test.cc

-1
Original file line numberDiff line numberDiff line change
@@ -3444,7 +3444,6 @@ class ModelDB : public DB {
34443444
Status ApplyReplicationLogRecord(ReplicationLogRecord /*record*/,
34453445
std::string /*replication_sequence*/,
34463446
CFOptionsFactory /* cf_options_factory */,
3447-
bool /* allow_new_manifest_writes */,
34483447
uint64_t /* snapshot_replication_epoch */,
34493448
ApplyReplicationLogRecordInfo* /*info*/,
34503449
unsigned /*flags*/) override {

include/rocksdb/db.h

-14
Original file line numberDiff line numberDiff line change
@@ -1481,13 +1481,6 @@ class DB {
14811481
// if its manifest update sequence number is lower and equal than the DB's
14821482
// manifest update sequence number.
14831483
uint64_t latest_applied_manifest_update_seq{0};
1484-
// If true, the replication event contains manifest writes with manifest
1485-
// update sequence number greater than the DB's manifest update sequence
1486-
// number. The new manifest writes may or may not be applied depending on
1487-
// value of `allow_new_manifest_writes`
1488-
// TODO(wei): remove once epoch based diverged manifest write detection is
1489-
// rolled out
1490-
bool has_new_manifest_writes{false};
14911484
bool diverged_manifest_writes{false};
14921485

14931486
// added_column_families contains column family handles for all column
@@ -1508,21 +1501,14 @@ class DB {
15081501
// options need to be returned. The function is invoked done outside of the DB
15091502
// mutex.
15101503
//
1511-
// If `allow_new_manifest_writes` is false, and there are new
1512-
// manifest writes after DB's manfiest update sequence, this function will set
1513-
// `has_new_manifest_writes_` to true and return early without applying
1514-
// the new manifest writes.
1515-
//
15161504
// REQUIRES: info needs to be provided, can't be nullptr.
15171505
enum ApplyReplicationLogRecordFlags : unsigned {
15181506
AR_EVICT_OBSOLETE_FILES = 1U << 0,
1519-
AR_EPOCH_BASED_DIVERGENCE_DETECTION = 1U << 1,
15201507
};
15211508
using CFOptionsFactory = std::function<ColumnFamilyOptions(Slice)>;
15221509
virtual Status ApplyReplicationLogRecord(ReplicationLogRecord record,
15231510
std::string replication_sequence,
15241511
CFOptionsFactory cf_options_factory,
1525-
bool allow_new_manifest_writes,
15261512
uint64_t snapshot_replication_epoch,
15271513
ApplyReplicationLogRecordInfo* info,
15281514
unsigned flags = 0) = 0;

include/rocksdb/utilities/stackable_db.h

+1-2
Original file line numberDiff line numberDiff line change
@@ -529,13 +529,12 @@ class StackableDB : public DB {
529529
Status ApplyReplicationLogRecord(ReplicationLogRecord record,
530530
std::string replication_sequence,
531531
CFOptionsFactory cf_options_factory,
532-
bool allow_new_manifest_writes,
533532
uint64_t snapshot_replication_epoch,
534533
ApplyReplicationLogRecordInfo* info,
535534
unsigned flags) override {
536535
return db_->ApplyReplicationLogRecord(
537536
record, replication_sequence, std::move(cf_options_factory),
538-
allow_new_manifest_writes, snapshot_replication_epoch, info, flags);
537+
snapshot_replication_epoch, info, flags);
539538
}
540539
Status GetReplicationRecordDebugString(const ReplicationLogRecord& record,
541540
std::string* out) const override {

0 commit comments

Comments
 (0)