Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist updates between server restarts #1903

Merged
merged 23 commits into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9a0f3d4
Implement very basic update persistence
RobinTF Mar 26, 2025
7ee415c
Fix segfault and typo
RobinTF Mar 26, 2025
562c3c1
Store mapping within serialization format
RobinTF Mar 26, 2025
db09b68
Use better view for joining
RobinTF Mar 26, 2025
f0c5e0b
Fix the assertion, and add an option for the persisting.
joka921 Mar 27, 2025
4dd726a
Address Hannah's proposed changes.
joka921 Mar 27, 2025
7869ca4
Refactor to use the serialization library and start writing some tests.
joka921 Mar 27, 2025
c15a5b8
Merge remote-tracking branch 'origin/master' into persistent-updates
Mar 27, 2025
e78d984
Fix option name and log message
Mar 27, 2025
9a4710a
Revise another log message
Mar 27, 2025
f331f94
Some additional cleanup and fix some sonarcloud issues.
joka921 Mar 28, 2025
7900081
Merge remote-tracking branch 'ad-freiburg/master' into persistent-upd…
RobinTF Mar 28, 2025
3e1ee75
Fix segfault
RobinTF Mar 28, 2025
c4b3d2a
Adjust magic bytes
RobinTF Mar 31, 2025
6b7fda5
Add basic unit tests
RobinTF Mar 31, 2025
2ec7fec
Address some PR comments
RobinTF Mar 31, 2025
25c3f31
Fix typo
RobinTF Mar 31, 2025
377c788
Add more unit tests and fix compilation error
RobinTF Mar 31, 2025
8734c28
Correctly include header where actually required
RobinTF Mar 31, 2025
9b6d16a
Fix formatting
RobinTF Mar 31, 2025
73f573c
Merge branch 'master' into persistent-updates
joka921 Mar 31, 2025
5c784f7
Fix the duplicate reading of the DeltaTriples.
joka921 Mar 31, 2025
0813cc7
Fix the test again.
joka921 Mar 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/ServerMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ int main(int argc, char** argv) {
bool noPatterns;
bool noPatternTrick;
bool onlyPsoAndPosPermutations;
bool persistUpdates;

ad_utility::MemorySize memoryMaxSize;

Expand Down Expand Up @@ -126,6 +127,9 @@ int main(int argc, char** argv) {
optionFactory.getProgramOption<"request-body-limit">(),
"Set the maximum size for the body of requests the server will process. "
"Set to zero to disable the limit.");
add("persist-updates", po::bool_switch(&persistUpdates),
"If set, then SPARQL UPDATES will be persisted on disk. Otherwise they "
"will be lost when the engine is stopped");
po::variables_map optionsMap;

try {
Expand All @@ -148,7 +152,8 @@ int main(int argc, char** argv) {
try {
Server server(port, numSimultaneousQueries, memoryMaxSize,
std::move(accessToken), !noPatternTrick);
server.run(indexBasename, text, !noPatterns, !onlyPsoAndPosPermutations);
server.run(indexBasename, text, !noPatterns, !onlyPsoAndPosPermutations,
persistUpdates);
} catch (const std::exception& e) {
// This code should never be reached as all exceptions should be handled
// within server.run()
Expand Down
2 changes: 2 additions & 0 deletions src/engine/ExecuteUpdate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ UpdateMetadata ExecuteUpdate::executeUpdate(
deltaTriples.insertTriples(cancellationHandle,
std::move(toInsert.idTriples_));
metadata.insertionTime_ = timer.msecs();
// TODO add timing information to metadata
deltaTriples.writeToDisk();
return metadata;
}

Expand Down
3 changes: 2 additions & 1 deletion src/engine/LocalVocab.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ class LocalVocab {
// for testing.
std::vector<LocalVocabEntry> getAllWordsForTesting() const;

const Set& primaryWordSet() const { return *primaryWordSet_; }

// Get a new BlankNodeIndex using the LocalBlankNodeManager.
[[nodiscard]] BlankNodeIndex getBlankNodeIndex(
ad_utility::BlankNodeManager* blankNodeManager);
Expand All @@ -176,7 +178,6 @@ class LocalVocab {
private:
// Accessors for the primary set.
Set& primaryWordSet() { return *primaryWordSet_; }
const Set& primaryWordSet() const { return *primaryWordSet_; }

// Common implementation for the two methods `getIndexAndAddIfNotContained`
// and `getIndexOrNullopt` above.
Expand Down
10 changes: 6 additions & 4 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,15 @@

// __________________________________________________________________________
void Server::initialize(const string& indexBaseName, bool useText,
bool usePatterns, bool loadAllPermutations) {
bool usePatterns, bool loadAllPermutations,
bool persistUpdates) {

Check warning on line 70 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L70

Added line #L70 was not covered by tests
LOG(INFO) << "Initializing server ..." << std::endl;

index_.usePatterns() = usePatterns;
index_.loadAllPermutations() = loadAllPermutations;

// Init the index.
index_.createFromOnDiskIndex(indexBaseName);
index_.createFromOnDiskIndex(indexBaseName, persistUpdates);

Check warning on line 77 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L77

Added line #L77 was not covered by tests
if (useText) {
index_.addTextFromOnDiskIndex();
}
Expand All @@ -88,7 +89,7 @@

// _____________________________________________________________________________
void Server::run(const string& indexBaseName, bool useText, bool usePatterns,
bool loadAllPermutations) {
bool loadAllPermutations, bool persistUpdates) {

Check warning on line 92 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L92

Added line #L92 was not covered by tests
using namespace ad_utility::httpUtils;

// Function that handles a request asynchronously, will be passed as argument
Expand Down Expand Up @@ -163,7 +164,8 @@
std::move(webSocketSessionSupplier)};

// Initialize the index
initialize(indexBaseName, useText, usePatterns, loadAllPermutations);
initialize(indexBaseName, useText, usePatterns, loadAllPermutations,
persistUpdates);

Check warning on line 168 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L167-L168

Added lines #L167 - L168 were not covered by tests

LOG(INFO) << "The server is ready, listening for requests on port "
<< std::to_string(httpServer.getPort()) << " ..." << std::endl;
Expand Down
5 changes: 3 additions & 2 deletions src/engine/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,14 @@ class Server {
private:
//! Initialize the server.
void initialize(const string& indexBaseName, bool useText,
bool usePatterns = true, bool loadAllPermutations = true);
bool usePatterns = true, bool loadAllPermutations = true,
bool persistUpdates = false);

public:
//! First initialize the server. Then loop, wait for requests and trigger
//! processing. This method never returns except when throwing an exception.
void run(const string& indexBaseName, bool useText, bool usePatterns = true,
bool loadAllPermutations = true);
bool loadAllPermutations = true, bool persistUpdates = false);

Index& index() { return index_; }
const Index& index() const { return index_; }
Expand Down
98 changes: 90 additions & 8 deletions src/index/DeltaTriples.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

#include "index/DeltaTriples.h"

#include "absl/strings/str_cat.h"
#include <absl/strings/str_cat.h>

#include "index/Index.h"
#include "index/IndexImpl.h"
#include "index/LocatedTriples.h"
#include "util/Serializer/TripleSerializer.h"

// ____________________________________________________________________________
LocatedTriples::iterator& DeltaTriples::LocatedTripleHandles::forPermutation(
Expand Down Expand Up @@ -218,38 +220,48 @@
// _____________________________________________________________________________
template <typename ReturnType>
ReturnType DeltaTriplesManager::modify(
const std::function<ReturnType(DeltaTriples&)>& function) {
const std::function<ReturnType(DeltaTriples&)>& function,
bool writeToDiskAfterRequest) {
// While holding the lock for the underlying `DeltaTriples`, perform the
// actual `function` (typically some combination of insert and delete
// operations) and (while still holding the lock) update the
// `currentLocatedTriplesSnapshot_`.
return deltaTriples_.withWriteLock(
[this, &function](DeltaTriples& deltaTriples) {
[this, &function, writeToDiskAfterRequest](DeltaTriples& deltaTriples) {
auto updateSnapshot = [this, &deltaTriples] {
auto newSnapshot = deltaTriples.getSnapshot();
currentLocatedTriplesSnapshot_.withWriteLock(
[&newSnapshot](auto& currentSnapshot) {
currentSnapshot = std::move(newSnapshot);
});
};
auto writeAndUpdateSnapshot = [&updateSnapshot, &deltaTriples,
writeToDiskAfterRequest]() {
if (writeToDiskAfterRequest) {
deltaTriples.writeToDisk();
}
updateSnapshot();
};

if constexpr (std::is_void_v<ReturnType>) {
function(deltaTriples);
updateSnapshot();
writeAndUpdateSnapshot();

Check warning on line 248 in src/index/DeltaTriples.cpp

View check run for this annotation

Codecov / codecov/patch

src/index/DeltaTriples.cpp#L248

Added line #L248 was not covered by tests
} else {
ReturnType returnValue = function(deltaTriples);
updateSnapshot();
writeAndUpdateSnapshot();

Check warning on line 251 in src/index/DeltaTriples.cpp

View check run for this annotation

Codecov / codecov/patch

src/index/DeltaTriples.cpp#L251

Added line #L251 was not covered by tests
return returnValue;
}
});
}
// Explicit instantions
template void DeltaTriplesManager::modify<void>(
std::function<void(DeltaTriples&)> const&);
std::function<void(DeltaTriples&)> const&, bool writeToDiskAfterRequest);
template nlohmann::json DeltaTriplesManager::modify<nlohmann::json>(
const std::function<nlohmann::json(DeltaTriples&)>&);
const std::function<nlohmann::json(DeltaTriples&)>&,
bool writeToDiskAfterRequest);
template DeltaTriplesCount DeltaTriplesManager::modify<DeltaTriplesCount>(
const std::function<DeltaTriplesCount(DeltaTriples&)>&);
const std::function<DeltaTriplesCount(DeltaTriples&)>&,
bool writeToDiskAfterRequest);

// _____________________________________________________________________________
void DeltaTriplesManager::clear() { modify<void>(&DeltaTriples::clear); }
Expand All @@ -267,3 +279,73 @@
.at(static_cast<size_t>(permutation))
.setOriginalMetadata(std::move(metadata));
}

// _____________________________________________________________________________
void DeltaTriples::writeToDisk() const {
if (!filenameForPersisting_.has_value()) {
return;
}
auto toRange = [](const TriplesToHandlesMap& map) {
return ad_utility::SizedJoinView{
map | ql::views::keys |
ql::views::transform(
[](const IdTriple<0>& triple) -> const std::array<Id, 4>& {
return triple.ids_;
})};
};
std::filesystem::path tempPath = filenameForPersisting_.value();
tempPath += ".tmp";
ad_utility::serializeIds(
tempPath, localVocab_,
std::array{toRange(triplesDeleted_), toRange(triplesInserted_)});
std::filesystem::rename(tempPath, filenameForPersisting_.value());
}

// _____________________________________________________________________________
void DeltaTriples::readFromDisk() {
if (!filenameForPersisting_.has_value()) {
return;
}

Check warning on line 308 in src/index/DeltaTriples.cpp

View check run for this annotation

Codecov / codecov/patch

src/index/DeltaTriples.cpp#L307-L308

Added lines #L307 - L308 were not covered by tests
AD_CONTRACT_CHECK(localVocab_.empty());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All members should be empty.

auto [vocab, idRanges] = ad_utility::deserializeIds(
filenameForPersisting_.value(), index_.getBlankNodeManager());
if (idRanges.empty()) {
return;
}
AD_CORRECTNESS_CHECK(idRanges.size() == 2);
auto toTriples = [](const std::vector<Id>& ids) {
Triples triples;
static_assert(std::tuple_size_v<
decltype(std::declval<Triples::value_type>().payload_)> ==
0);
constexpr size_t cols = Triples::value_type::NumCols;
AD_CORRECTNESS_CHECK(ids.size() % cols == 0);
triples.reserve(ids.size() / cols);
for (size_t i = 0; i < ids.size(); i += cols) {
triples.emplace_back(
std::array{ids[i], ids[i + 1], ids[i + 2], ids[i + 3]});
}
return triples;
};
auto cancellationHandle =
std::make_shared<CancellationHandle::element_type>();
insertTriples(cancellationHandle, toTriples(idRanges.at(1)));
deleteTriples(cancellationHandle, toTriples(idRanges.at(0)));
AD_LOG_INFO << "Done, #inserted triples = " << idRanges.at(1).size()
<< ", #deleted triples = " << idRanges.at(0).size() << std::endl;
}
// _____________________________________________________________________________
void DeltaTriples::setPersists(std::optional<std::string> filename) {
filenameForPersisting_ = std::move(filename);
}

// _____________________________________________________________________________
void DeltaTriplesManager::setFilenameForPersistentUpdatesAndReadFromDisk(
std::string filename) {
modify<void>(
[&filename](DeltaTriples& deltaTriples) {
deltaTriples.setPersists(std::move(filename));
deltaTriples.readFromDisk();
},
false);
}

Check warning on line 351 in src/index/DeltaTriples.cpp

View check run for this annotation

Codecov / codecov/patch

src/index/DeltaTriples.cpp#L344-L351

Added lines #L344 - L351 were not covered by tests
24 changes: 22 additions & 2 deletions src/index/DeltaTriples.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class DeltaTriples {
FRIEND_TEST(DeltaTriplesTest, insertTriplesAndDeleteTriples);
FRIEND_TEST(DeltaTriplesTest, clear);
FRIEND_TEST(DeltaTriplesTest, addTriplesToLocalVocab);
FRIEND_TEST(DeltaTriplesTest, storeAndRestoreData);

public:
using Triples = std::vector<IdTriple<0>>;
Expand All @@ -90,6 +91,9 @@ class DeltaTriples {
// which are not contained in the vocabulary of the original index).
LocalVocab localVocab_;

// See the documentation of `setPersist()` below.
std::optional<std::string> filenameForPersisting_;

// Assert that the Permutation Enum values have the expected int values.
// This is used to store and lookup items that exist for permutation in an
// array.
Expand Down Expand Up @@ -122,7 +126,7 @@ class DeltaTriples {
public:
// Construct for given index.
explicit DeltaTriples(const Index& index);
explicit DeltaTriples(const IndexImpl& index) : index_{index} {};
explicit DeltaTriples(const IndexImpl& index) : index_{index} {}

// Disable accidental copying.
DeltaTriples(const DeltaTriples&) = delete;
Expand Down Expand Up @@ -161,6 +165,17 @@ class DeltaTriples {
// Delete triples.
void deleteTriples(CancellationHandle cancellationHandle, Triples triples);

// If the `filename` is set, then `writeToDisk()` will write these
// `DeltaTriples` to `filename.value()`. If `filename` is `nullopt`, then
// `writeToDisk` will be a nullop.
void setPersists(std::optional<std::string> filename);

// Write the delta triples to disk to persist them between restarts.
void writeToDisk() const;

// Read the delta triples from disk to restore them after a restart.
void readFromDisk();

// Return a deep copy of the `LocatedTriples` and the corresponding
// `LocalVocab` which form a snapshot of the current status of this
// `DeltaTriples` object.
Expand Down Expand Up @@ -209,6 +224,8 @@ class DeltaTriples {
// delete the respective entry in `triplesInserted_` or `triplesDeleted_`,
// which stores these iterators.
void eraseTripleInAllPermutations(LocatedTripleHandles& handles);

friend class DeltaTriplesManager;
};

// This class synchronizes the access to a `DeltaTriples` object, thus avoiding
Expand All @@ -231,7 +248,10 @@ class DeltaTriplesManager {
// snapshot before or after a modification, but never one of an ongoing
// modification.
template <typename ReturnType>
ReturnType modify(const std::function<ReturnType(DeltaTriples&)>& function);
ReturnType modify(const std::function<ReturnType(DeltaTriples&)>& function,
bool writeToDiskAfterRequest = true);

void setFilenameForPersistentUpdatesAndReadFromDisk(std::string filename);

// Reset the updates represented by the underlying `DeltaTriples` and then
// update the current snapshot.
Expand Down
5 changes: 3 additions & 2 deletions src/index/Index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ Index::Index(Index&&) noexcept = default;
Index::~Index() = default;

// ____________________________________________________________________________
void Index::createFromOnDiskIndex(const std::string& onDiskBase) {
pimpl_->createFromOnDiskIndex(onDiskBase);
void Index::createFromOnDiskIndex(const std::string& onDiskBase,
bool persistUpdatesOnDisk) {
pimpl_->createFromOnDiskIndex(onDiskBase, persistUpdatesOnDisk);
}

// ____________________________________________________________________________
Expand Down
3 changes: 2 additions & 1 deletion src/index/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ class Index {
// constructed using the `createFromFile` method which is typically called via
// `IndexBuilderMain`. Read necessary metadata into memory and open file
// handles.
void createFromOnDiskIndex(const std::string& onDiskBase);
void createFromOnDiskIndex(const std::string& onDiskBase,
bool persistUpdatesOnDisk);

// Add a text index to a complete KB index. First read the given context
// file (if file name not empty), then add words from literals (if true).
Expand Down
20 changes: 15 additions & 5 deletions src/index/IndexImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,8 @@
}

// _____________________________________________________________________________
void IndexImpl::createFromOnDiskIndex(const string& onDiskBase) {
void IndexImpl::createFromOnDiskIndex(const string& onDiskBase,
bool persistUpdatesOnDisk) {
setOnDiskBase(onDiskBase);
readConfiguration();
vocab_.readFromFile(onDiskBase_ + VOCAB_SUFFIX);
Expand All @@ -893,11 +894,16 @@
// TODO<joka921> We could delegate the setting of the metadata to the
// `Permutation`class, but we first have to deal with The delta triples for
// the additional permutations.
// The setting of the metadata doesn't affect the contents of the delta
// triples, so we don't need to call `writeToDisk`, therefore the second
// argument to `modify` is `false`.
auto setMetadata = [this](const Permutation& p) {
deltaTriplesManager().modify<void>([&p](DeltaTriples& deltaTriples) {
deltaTriples.setOriginalMetadata(p.permutation(),
p.metaData().blockDataShared());
});
deltaTriplesManager().modify<void>(
[&p](DeltaTriples& deltaTriples) {
deltaTriples.setOriginalMetadata(p.permutation(),
p.metaData().blockDataShared());
},
false);
};

auto load = [this, &isInternalId, &setMetadata](
Expand Down Expand Up @@ -941,6 +947,10 @@
usePatterns_ = false;
}
}
if (persistUpdatesOnDisk) {
deltaTriples_.value().setFilenameForPersistentUpdatesAndReadFromDisk(
onDiskBase + ".update-triples");
}

Check warning on line 953 in src/index/IndexImpl.cpp

View check run for this annotation

Codecov / codecov/patch

src/index/IndexImpl.cpp#L951-L953

Added lines #L951 - L953 were not covered by tests
}

// _____________________________________________________________________________
Expand Down
3 changes: 2 additions & 1 deletion src/index/IndexImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ class IndexImpl {

// Creates an index object from an on disk index that has previously been
// constructed. Read necessary meta data into memory and opens file handles.
void createFromOnDiskIndex(const string& onDiskBase);
void createFromOnDiskIndex(const string& onDiskBase,
bool persistUpdatesOnDisk);

// Adds a text index to a complete KB index. Reads words from the given
// wordsfile and calculates bm25 scores with the docsfile if given.
Expand Down
Loading
Loading