Skip to content

Commit cd7551c

Browse files
authored
refactor some Pegasus modifications (#17)
1 parent e17dfe6 commit cd7551c

14 files changed

+32
-50
lines changed

db/column_family.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -1080,12 +1080,12 @@ void ColumnFamilySet::SetPegasusDataVersion(uint32_t version) {
10801080
pegasus_data_version_ = version;
10811081
}
10821082

1083-
uint64_t ColumnFamilySet::GetLastManualCompactFinishTime() {
1083+
uint64_t ColumnFamilySet::GetLastManualCompactFinishTime() const {
10841084
return last_manual_compact_finish_time_;
10851085
}
10861086

10871087
void ColumnFamilySet::SetLastManualCompactFinishTime(uint64_t ms) {
1088-
last_manual_compact_finish_time_ = ms;
1088+
last_manual_compact_finish_time_ = ms;
10891089
}
10901090

10911091
// under a DB mutex AND write thread

db/column_family.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ class ColumnFamilySet {
484484
size_t NumberOfColumnFamilies() const;
485485
uint32_t GetPegasusDataVersion() const;
486486
void SetPegasusDataVersion(uint32_t version);
487-
uint64_t GetLastManualCompactFinishTime();
487+
uint64_t GetLastManualCompactFinishTime() const;
488488
void SetLastManualCompactFinishTime(uint64_t ms);
489489
ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id,
490490
Version* dummy_version,

db/db_filesnapshot.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
148148
Status DBImpl::GetLiveFilesQuick(std::vector<std::string>& ret,
149149
uint64_t* manifest_file_size,
150150
SequenceNumber* last_sequence,
151-
uint64_t* last_decree) {
151+
uint64_t* last_decree) const {
152152
// ATTENTION(laiyingchun): only used for Pegasus.
153153
assert(pegasus_data_);
154154
*manifest_file_size = 0;

db/db_impl.cc

+6-6
Original file line numberDiff line numberDiff line change
@@ -751,7 +751,7 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const {
751751
return versions_->LastSequence();
752752
}
753753

754-
uint64_t DBImpl::GetLastFlushedDecree() {
754+
uint64_t DBImpl::GetLastFlushedDecree() const {
755755
SequenceNumber seq;
756756
uint64_t d;
757757

@@ -771,11 +771,11 @@ uint32_t DBImpl::GetPegasusDataVersion() const {
771771
return version;
772772
}
773773

774-
uint64_t DBImpl::GetLastManualCompactFinishTime() {
775-
mutex_.Lock();
776-
uint64_t ms = versions_->GetColumnFamilySet()->GetLastManualCompactFinishTime();
777-
mutex_.Unlock();
778-
return ms;
774+
uint64_t DBImpl::GetLastManualCompactFinishTime() const {
775+
mutex_.Lock();
776+
uint64_t ms = versions_->GetColumnFamilySet()->GetLastManualCompactFinishTime();
777+
mutex_.Unlock();
778+
return ms;
779779
}
780780

781781
SequenceNumber DBImpl::IncAndFetchSequenceNumber() {

db/db_impl.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -231,11 +231,11 @@ class DBImpl : public DB {
231231
bool HasActiveSnapshotInRange(SequenceNumber lower_bound,
232232
SequenceNumber upper_bound);
233233

234-
virtual uint64_t GetLastFlushedDecree() override;
234+
virtual uint64_t GetLastFlushedDecree() const override;
235235

236236
virtual uint32_t GetPegasusDataVersion() const override;
237237

238-
virtual uint64_t GetLastManualCompactFinishTime() override;
238+
virtual uint64_t GetLastManualCompactFinishTime() const override;
239239

240240
#ifndef ROCKSDB_LITE
241241
using DB::ResetStats;
@@ -250,7 +250,7 @@ class DBImpl : public DB {
250250
virtual Status GetLiveFilesQuick(std::vector<std::string>& ret,
251251
uint64_t* manifest_file_size,
252252
SequenceNumber* last_sequence,
253-
uint64_t* last_decree) override;
253+
uint64_t* last_decree) const override;
254254
virtual Status GetSortedWalFiles(VectorLogPtr& files) override;
255255

256256
virtual Status GetUpdatesSince(

db/db_impl_write.cc

+2-4
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
231231
size_t seq_inc = seq_per_batch_ ? write_group.size : total_count;
232232

233233
const bool concurrent_update = concurrent_prepare_;
234-
235234
// Update stats while we are an exclusive group leader, so we know
236235
// that nobody else can be writing to these particular stats.
237236
// We're optimistic, updating the stats before we successfully
@@ -278,9 +277,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
278277
}
279278
}
280279
assert(last_sequence != kMaxSequenceNumber);
281-
const SequenceNumber current_sequence = write_options.given_sequence_number == 0 ?
282-
(last_sequence + 1) : write_options.given_sequence_number;
283-
last_sequence = current_sequence + seq_inc - 1;
280+
const SequenceNumber current_sequence = last_sequence + 1;
281+
last_sequence += seq_inc;
284282

285283
if (status.ok()) {
286284
PERF_TIMER_GUARD(write_memtable_time);

db/memtable.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ class MemTable {
368368
return oldest_key_time_.load(std::memory_order_relaxed);
369369
}
370370

371-
void GetLastSeqDecree(SequenceNumber* sequence, uint64_t* decree) {
371+
void GetLastSeqDecree(SequenceNumber* sequence, uint64_t* decree) const {
372372
*sequence = last_sequence_;
373373
*decree = last_decree_;
374374
}

db/version_edit.h

+1-5
Original file line numberDiff line numberDiff line change
@@ -217,11 +217,7 @@ class VersionEdit {
217217
return last_sequence_;
218218
}
219219

220-
bool HasLastFlushSeqDecree() const {
221-
return has_last_flush_seq_decree_;
222-
}
223-
224-
void GetLastFlushSeqDecree(SequenceNumber* sequence, uint64_t* decree) {
220+
void GetLastFlushSeqDecree(SequenceNumber* sequence, uint64_t* decree) const {
225221
*sequence = last_flush_sequence_;
226222
*decree = last_flush_decree_;
227223
}

db/version_set.cc

+4-4
Original file line numberDiff line numberDiff line change
@@ -2478,7 +2478,7 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
24782478
SequenceNumber seq;
24792479
uint64_t d;
24802480
current->GetLastFlushSeqDecree(&seq, &d);
2481-
v->UpdateLastFlushSeqDecree(seq, d);
2481+
v->UpdateLastFlushSeqDecreeIfNeeded(seq, d);
24822482
}
24832483
current->Unref();
24842484
}
@@ -2725,7 +2725,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
27252725
SequenceNumber seq;
27262726
uint64_t d;
27272727
e->GetLastFlushSeqDecree(&seq, &d);
2728-
v->UpdateLastFlushSeqDecree(seq, d);
2728+
v->UpdateLastFlushSeqDecreeIfNeeded(seq, d);
27292729
}
27302730
}
27312731
if (max_log_number_in_batch != 0) {
@@ -3153,7 +3153,7 @@ Status VersionSet::Recover(
31533153
if (db_options_->pegasus_data) {
31543154
// update last flush sequence/decree
31553155
auto &p = last_flush_seq_decree_map[cfd->GetID()];
3156-
v->UpdateLastFlushSeqDecree(p.first, p.second);
3156+
v->UpdateLastFlushSeqDecreeIfNeeded(p.first, p.second);
31573157
}
31583158

31593159
// Install recovered version
@@ -3547,7 +3547,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
35473547
builder->SaveTo(v->storage_info());
35483548

35493549
auto& p = last_flush_seq_decree_map[cfd->GetID()];
3550-
v->UpdateLastFlushSeqDecree(p.first, p.second);
3550+
v->UpdateLastFlushSeqDecreeIfNeeded(p.first, p.second);
35513551

35523552
v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);
35533553

db/version_set.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -617,12 +617,12 @@ class Version {
617617

618618
void GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta);
619619

620-
void GetLastFlushSeqDecree(SequenceNumber* sequence, uint64_t* decree) {
620+
void GetLastFlushSeqDecree(SequenceNumber* sequence, uint64_t* decree) const {
621621
*sequence = last_flush_sequence_;
622622
*decree = last_flush_decree_;
623623
}
624624

625-
void UpdateLastFlushSeqDecree(SequenceNumber sequence, uint64_t decree) {
625+
void UpdateLastFlushSeqDecreeIfNeeded(SequenceNumber sequence, uint64_t decree) {
626626
if (sequence > last_flush_sequence_) {
627627
assert(decree >= last_flush_decree_);
628628
last_flush_sequence_ = sequence;
@@ -795,7 +795,7 @@ class VersionSet {
795795
}
796796

797797
// Return the last flush sequence number of default column family.
798-
uint64_t LastFlushSequence() {
798+
uint64_t LastFlushSequence() const {
799799
assert(db_options_->pegasus_data);
800800
assert(column_family_set_->NumberOfColumnFamilies() == 1u);
801801
SequenceNumber seq;

include/rocksdb/db.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -884,12 +884,12 @@ class DB {
884884
virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) = 0;
885885

886886
// The last flushed decree.
887-
virtual uint64_t GetLastFlushedDecree() { return 0; }
887+
virtual uint64_t GetLastFlushedDecree() const { return 0; }
888888

889889
// The Pegasus data version.
890890
virtual uint32_t GetPegasusDataVersion() const { return 0; }
891891

892-
virtual uint64_t GetLastManualCompactFinishTime() { return 0; }
892+
virtual uint64_t GetLastManualCompactFinishTime() const { return 0; }
893893

894894
#ifndef ROCKSDB_LITE
895895

@@ -933,7 +933,7 @@ class DB {
933933
virtual Status GetLiveFilesQuick(std::vector<std::string>& ret,
934934
uint64_t* manifest_file_size,
935935
SequenceNumber* last_sequence,
936-
uint64_t* last_decree) { return Status::NotSupported(); }
936+
uint64_t* last_decree) const { return Status::NotSupported(); }
937937

938938
// Retrieve the sorted list of all wal files with earliest file first
939939
virtual Status GetSortedWalFiles(VectorLogPtr& files) = 0;

include/rocksdb/options.h

-9
Original file line numberDiff line numberDiff line change
@@ -1157,14 +1157,6 @@ struct WriteOptions {
11571157
// Default: false
11581158
bool low_pri;
11591159

1160-
// Sequence number is usually controlled by the db itself as 1,2,3, ...
1161-
// however, in cases where the upper frameworks (e.g., replication), the sequence
1162-
// number is given and the underlying db should use this given sequence number directly
1163-
// instead of generating one by itself.
1164-
//
1165-
// Default: 0 (rocksdb should generate the number by itself in this case)
1166-
SequenceNumber given_sequence_number;
1167-
11681160
// Decree is an value affiliated to the write.
11691161
uint64_t given_decree;
11701162

@@ -1174,7 +1166,6 @@ struct WriteOptions {
11741166
ignore_missing_column_families(false),
11751167
no_slowdown(false),
11761168
low_pri(false),
1177-
given_sequence_number(0),
11781169
given_decree(0) {}
11791170
};
11801171

include/rocksdb/write_batch.h

-3
Original file line numberDiff line numberDiff line change
@@ -356,9 +356,6 @@ class WriteBatch : public WriteBatchBase {
356356
protected:
357357
std::string rep_; // See comment in write_batch.cc for the format of rep_
358358

359-
bool use_shared_sequence_number_; // when seq is given and shared by multiple write ops,
360-
// see more comments for WriteOptions
361-
362359
// Intentionally copyable
363360
};
364361

utilities/checkpoint/checkpoint_impl.cc

+5-5
Original file line numberDiff line numberDiff line change
@@ -310,10 +310,10 @@ Status CheckpointImpl::CreateCustomCheckpoint(
310310
}
311311

312312
struct LogReporter : public log::Reader::Reporter {
313-
Status* status;
313+
Status* status = nullptr;
314314

315-
virtual void Corruption(size_t bytes, const Status& s) override {
316-
if (this->status->ok()) *this->status = s;
315+
void Corruption(size_t bytes, const Status& s) override {
316+
if (this->status && this->status->ok()) *this->status = s;
317317
}
318318
};
319319

@@ -484,8 +484,8 @@ Status CheckpointImpl::CreateCheckpointQuick(const std::string& checkpoint_dir,
484484
db_->EnableFileDeletions(false);
485485

486486
if (s.ok()) {
487-
// modify menifest file to set correct last_seq in VersionEdit, because
488-
// the last_seq recorded in menifest may be greater than the real value
487+
// modify manifest file to set correct last_seq in VersionEdit, because
488+
// the last_seq recorded in manifest may be greater than the real value
489489
assert(!manifest_file_path.empty());
490490
s = ModifyManifestFileLastSeq(db_->GetEnv(), db_->GetOptions(),
491491
manifest_file_path, last_sequence);

0 commit comments

Comments
 (0)