Skip to content

Commit

Permalink
db: provide SnapshotRepository to DataModel (#2440)
Browse files Browse the repository at this point in the history
* no more DataModel::set_snapshot_repository singleton
* BodyFindByBlockNumMultiQuery: useful without RoTx
* DataStoreRef: source of creating DataModel queries for kv::api, sync and daemon
* Buffer works with BufferDataModel
* ExecutionPipeline accepts StageContainerFactory
* stages and forks use DataModelFactory
* SnapshotRepository: init with path instead of settings
  • Loading branch information
battlmonstr authored Oct 25, 2024
1 parent ec8498e commit 213f140
Show file tree
Hide file tree
Showing 120 changed files with 1,046 additions and 534 deletions.
6 changes: 1 addition & 5 deletions cmd/capi/execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,15 +430,11 @@ int main(int argc, char* argv[]) {
return init_status_code;
}

// Add snapshots to Silkworm API library
SnapshotSettings snapshot_settings{};
snapshot_settings.repository_dir = data_dir.snapshots().path();

int status_code = -1;
if (settings.execute_blocks_settings) {
// Execute specified block range using Silkworm API library
SnapshotRepository repository{
snapshot_settings,
data_dir.snapshots().path(),
std::make_unique<StepToBlockNumConverter>(),
std::make_unique<db::SnapshotBundleFactoryImpl>(),
};
Expand Down
7 changes: 3 additions & 4 deletions cmd/dev/check_changes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,12 @@ int main(int argc, char* argv[]) {
}

snapshots::SnapshotRepository repository{
snapshots::SnapshotSettings{},
data_dir.snapshots().path(),
std::make_unique<snapshots::StepToBlockNumConverter>(),
std::make_unique<db::SnapshotBundleFactoryImpl>(),
};
repository.reopen_folder();
db::DataModel::set_snapshot_repository(&repository);
db::DataModel access_layer{txn};
db::DataModel access_layer{txn, repository};

AnalysisCache analysis_cache{/*max_size=*/5'000};
std::vector<Receipt> receipts;
Expand All @@ -130,7 +129,7 @@ int main(int argc, char* argv[]) {
break;
}

db::Buffer buffer{txn};
db::Buffer buffer{txn, std::make_unique<db::BufferFullDataModel>(access_layer)};
buffer.set_historical_block(block_num);

ExecutionProcessor processor{block, *rule_set, buffer, *chain_config};
Expand Down
49 changes: 37 additions & 12 deletions cmd/dev/db_toolbox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@
#include <silkworm/infra/test_util/task_runner.hpp>
#include <silkworm/node/stagedsync/execution_pipeline.hpp>
#include <silkworm/node/stagedsync/stages/stage_bodies.hpp>
#include <silkworm/node/stagedsync/stages/stage_bodies_factory.hpp>
#include <silkworm/node/stagedsync/stages/stage_interhashes/trie_cursor.hpp>
#include <silkworm/node/stagedsync/stages_factory_impl.hpp>

#include "../common/common.hpp"

Expand Down Expand Up @@ -562,6 +564,29 @@ void do_stage_set(EnvConfig& config, const std::string& stage_name, uint32_t new
std::cout << "\n Stage " << stage_name << " touched from " << old_height << " to " << new_height << "\n\n";
}

static silkworm::stagedsync::BodiesStageFactory make_bodies_stage_factory(
const silkworm::ChainConfig& chain_config,
silkworm::db::DataModelFactory data_model_factory) {
return [chain_config, data_model_factory = std::move(data_model_factory)](silkworm::stagedsync::SyncContext* sync_context) {
return std::make_unique<silkworm::stagedsync::BodiesStage>(
sync_context,
chain_config,
data_model_factory,
[] { return 0; });
};
};

static silkworm::stagedsync::StageContainerFactory make_stages_factory(
const silkworm::NodeSettings& node_settings,
silkworm::db::DataModelFactory data_model_factory) {
auto bodies_stage_factory = make_bodies_stage_factory(*node_settings.chain_config, data_model_factory);
return silkworm::stagedsync::StagesFactoryImpl::to_factory({
node_settings,
std::move(data_model_factory),
std::move(bodies_stage_factory),
});
}

void unwind(EnvConfig& config, BlockNum unwind_point, bool remove_blocks) {
ensure(config.exclusive, "Function requires exclusive access to database");

Expand All @@ -572,25 +597,29 @@ void unwind(EnvConfig& config, BlockNum unwind_point, bool remove_blocks) {
auto chain_config{read_chain_config(txn)};
ensure(chain_config.has_value(), "Not an initialized Silkworm db or unknown/custom chain");

auto data_directory = std::make_unique<DataDirectory>();
snapshots::SnapshotRepository repository{
data_directory->snapshots().path(),
std::make_unique<snapshots::StepToBlockNumConverter>(),
std::make_unique<SnapshotBundleFactoryImpl>(),
};
db::DataModelFactory data_model_factory = [&](db::ROTxn& tx) { return db::DataModel{tx, repository}; };

boost::asio::io_context io_context;

NodeSettings settings{
.data_directory = std::make_unique<DataDirectory>(),
.data_directory = std::move(data_directory),
.chaindata_env_config = config,
.chain_config = chain_config};

stagedsync::BodiesStageFactory bodies_stage_factory = [&](stagedsync::SyncContext* sync_context) {
return std::make_unique<stagedsync::BodiesStage>(sync_context, *settings.chain_config, [] { return 0; });
};

stagedsync::TimerFactory log_timer_factory = [&](std::function<bool()> callback) {
return std::make_shared<Timer>(io_context.get_executor(), settings.sync_loop_log_interval_seconds * 1000, std::move(callback));
};

stagedsync::ExecutionPipeline stage_pipeline{
&settings,
data_model_factory,
std::move(log_timer_factory),
std::move(bodies_stage_factory),
make_stages_factory(settings, data_model_factory),
};
const auto unwind_result{stage_pipeline.unwind(txn, unwind_point)};

Expand Down Expand Up @@ -2237,16 +2266,12 @@ void do_freeze(EnvConfig& config, const DataDirectory& data_dir, bool keep_block
auto env = open_env(config);
StageSchedulerAdapter stage_scheduler{RWAccess{env}};

snapshots::SnapshotSettings settings;
settings.repository_dir = data_dir.snapshots().path();
settings.no_downloader = true;
snapshots::SnapshotRepository repository{
std::move(settings),
data_dir.snapshots().path(),
std::make_unique<snapshots::StepToBlockNumConverter>(),
std::make_unique<SnapshotBundleFactoryImpl>(),
};
repository.reopen_folder();
DataModel::set_snapshot_repository(&repository);

Freezer freezer{ROAccess{env}, repository, stage_scheduler, data_dir.temp().path(), keep_blocks};

Expand Down
2 changes: 1 addition & 1 deletion cmd/dev/scan_txs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ int main(int argc, char* argv[]) {
break;
}

db::Buffer buffer{txn};
db::Buffer buffer{txn, std::make_unique<db::BufferROTxDataModel>(txn)};
buffer.set_historical_block(block_num);

ExecutionProcessor processor{block, *rule_set, buffer, *chain_config};
Expand Down
25 changes: 21 additions & 4 deletions cmd/dev/snapshots.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,9 @@ void decode_segment(const SnapshotSubcommandSettings& settings, int repetitions)
SILK_INFO << "Decode snapshot elapsed: " << as_milliseconds(elapsed) << " msec";
}

static SnapshotRepository make_repository(SnapshotSettings settings) {
static SnapshotRepository make_repository(const SnapshotSettings& settings) {
return SnapshotRepository{
std::move(settings),
settings.repository_dir,
std::make_unique<snapshots::StepToBlockNumConverter>(),
std::make_unique<silkworm::db::SnapshotBundleFactoryImpl>(),
};
Expand Down Expand Up @@ -1056,10 +1056,27 @@ void sync(const SnapshotSettings& settings) {
TemporaryDirectory tmp_dir;
db::EnvConfig chaindata_env_config{tmp_dir.path()};
auto chaindata_env = db::open_env(chaindata_env_config);

SnapshotRepository repository{
settings.repository_dir,
std::make_unique<StepToBlockNumConverter>(),
std::make_unique<db::SnapshotBundleFactoryImpl>(),
};

db::DataStoreRef data_store{
chaindata_env, // NOLINT(cppcoreguidelines-slicing)
repository,
};

test_util::TaskRunner runner;
NoopStageSchedulerAdapter stage_scheduler;
// NOLINTNEXTLINE(cppcoreguidelines-slicing)
db::SnapshotSync snapshot_sync{settings, kMainnetConfig.chain_id, chaindata_env, tmp_dir.path(), stage_scheduler};
db::SnapshotSync snapshot_sync{
settings,
kMainnetConfig.chain_id,
data_store,
tmp_dir.path(),
stage_scheduler,
};
runner.run(snapshot_sync.download_snapshots());
std::chrono::duration elapsed{std::chrono::steady_clock::now() - start};

Expand Down
32 changes: 27 additions & 5 deletions silkworm/capi/fork_validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <silkworm/core/common/base.hpp>
#include <silkworm/infra/common/environment.hpp>
#include <silkworm/node/stagedsync/stages/stage_bodies.hpp>
#include <silkworm/node/stagedsync/stages/stage_bodies_factory.hpp>
#include <silkworm/node/stagedsync/stages_factory_impl.hpp>

#include "common.hpp"
#include "instance.hpp"
Expand Down Expand Up @@ -80,12 +80,29 @@ static void set_node_settings(SilkwormHandle handle, const struct SilkwormForkVa
handle->node_settings.keep_db_txn_open = false; // Ensure that the transaction is closed after each request, Erigon manages transactions differently
}

static silkworm::stagedsync::BodiesStageFactory make_bodies_stage_factory(const silkworm::ChainConfig& chain_config) {
return [chain_config](silkworm::stagedsync::SyncContext* sync_context) {
return std::make_unique<silkworm::stagedsync::BodiesStage>(sync_context, chain_config, [] { return 0; });
static silkworm::stagedsync::BodiesStageFactory make_bodies_stage_factory(
const silkworm::ChainConfig& chain_config,
silkworm::db::DataModelFactory data_model_factory) {
return [chain_config, data_model_factory = std::move(data_model_factory)](silkworm::stagedsync::SyncContext* sync_context) {
return std::make_unique<silkworm::stagedsync::BodiesStage>(
sync_context,
chain_config,
data_model_factory,
[] { return 0; });
};
};

static silkworm::stagedsync::StageContainerFactory make_stages_factory(
const silkworm::NodeSettings& node_settings,
silkworm::db::DataModelFactory data_model_factory) {
auto bodies_stage_factory = make_bodies_stage_factory(*node_settings.chain_config, data_model_factory);
return silkworm::stagedsync::StagesFactoryImpl::to_factory({
node_settings,
std::move(data_model_factory),
std::move(bodies_stage_factory),
});
}

SILKWORM_EXPORT int silkworm_start_fork_validator(SilkwormHandle handle, MDBX_env* mdbx_env, const struct SilkwormForkValidatorSettings* settings) SILKWORM_NOEXCEPT {
if (!handle) {
return SILKWORM_INVALID_HANDLE;
Expand All @@ -111,11 +128,16 @@ SILKWORM_EXPORT int silkworm_start_fork_validator(SilkwormHandle handle, MDBX_en

silkworm::db::EnvUnmanaged unmanaged_env{mdbx_env};
silkworm::db::RWAccess rw_access{unmanaged_env};
silkworm::db::DataModelFactory data_model_factory = [handle](silkworm::db::ROTxn& tx) {
return silkworm::db::DataModel{tx, *handle->snapshot_repository};
};

handle->execution_engine = std::make_unique<silkworm::stagedsync::ExecutionEngine>(
/* executor = */ std::nullopt, // ExecutionEngine manages an internal io_context
handle->node_settings,
data_model_factory,
/* log_timer_factory = */ std::nullopt,
make_bodies_stage_factory(*handle->node_settings.chain_config),
make_stages_factory(handle->node_settings, data_model_factory),
rw_access);

silkworm::log::Info("Execution engine created");
Expand Down
7 changes: 5 additions & 2 deletions silkworm/capi/rpcdaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,13 @@ SILKWORM_EXPORT int silkworm_start_rpcdaemon(SilkwormHandle handle, MDBX_env* en
}

auto daemon_settings = make_daemon_settings(handle, *settings);
db::EnvUnmanaged unmanaged_env{env};
db::DataStoreRef data_store{
db::EnvUnmanaged{env},
*handle->snapshot_repository,
};

// Create the one-and-only Silkrpc daemon
handle->rpcdaemon = std::make_unique<rpc::Daemon>(daemon_settings, std::make_optional<mdbx::env>(unmanaged_env));
handle->rpcdaemon = std::make_unique<rpc::Daemon>(daemon_settings, data_store);

// Check protocol version compatibility with Core Services
if (!daemon_settings.skip_protocol_check) {
Expand Down
42 changes: 28 additions & 14 deletions silkworm/capi/silkworm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,19 +223,19 @@ SILKWORM_EXPORT int silkworm_init(SilkwormHandle* handle, const struct SilkwormS
log::init(log_settings);
log::Info{"Silkworm build info", log_args_for_version()}; // NOLINT(*-unused-raii)

auto data_dir_path = parse_path(settings->data_dir_path);
auto snapshot_repository = std::make_unique<snapshots::SnapshotRepository>(
snapshots::SnapshotSettings{},
DataDirectory{data_dir_path}.snapshots().path(),
std::make_unique<snapshots::StepToBlockNumConverter>(),
std::make_unique<db::SnapshotBundleFactoryImpl>());
db::DataModel::set_snapshot_repository(snapshot_repository.get());

// NOLINTNEXTLINE(bugprone-unhandled-exception-at-new)
*handle = new SilkwormInstance{
.log_settings = std::move(log_settings),
.context_pool_settings = {
.num_contexts = settings->num_contexts > 0 ? settings->num_contexts : silkworm::concurrency::kDefaultNumContexts,
},
.data_dir_path = parse_path(settings->data_dir_path),
.data_dir_path = std::move(data_dir_path),
.node_settings = {},
.snapshot_repository = std::move(snapshot_repository),
.rpcdaemon = {},
Expand Down Expand Up @@ -406,16 +406,18 @@ class BlockProvider {

public:
BlockProvider(BoundedBuffer<std::optional<Block>>* block_buffer,
mdbx::env env,
db::ROAccess db_access,
db::DataModelFactory data_model_factory,
BlockNum start_block, BlockNum max_block)
: block_buffer_{block_buffer},
env_{std::move(env)},
db_access_{std::move(db_access)},
data_model_factory_{std::move(data_model_factory)},
start_block_{start_block},
max_block_{max_block} {}

void operator()() {
db::ROTxnManaged txn{env_};
db::DataModel access_layer{txn};
db::ROTxnManaged txn = db_access_.start_ro_tx();
db::DataModel access_layer = data_model_factory_(txn);

BlockNum current_block{start_block_};
size_t refresh_counter{kTxnRefreshThreshold};
Expand All @@ -433,11 +435,11 @@ class BlockProvider {

if (--refresh_counter == 0) {
txn.abort();
txn = db::ROTxnManaged{env_};
txn = db_access_.start_ro_tx();
refresh_counter = kTxnRefreshThreshold;
}
}
} catch (const boost::thread_interrupted& ti) {
} catch (const boost::thread_interrupted&) {
SILK_TRACE << "thread_interrupted in block provider thread";
} catch (const std::exception& ex) {
SILK_WARN << "unexpected exception in block provider thread: what=" << ex.what();
Expand All @@ -448,7 +450,8 @@ class BlockProvider {

private:
BoundedBuffer<std::optional<Block>>* block_buffer_;
mdbx::env env_;
db::ROAccess db_access_;
db::DataModelFactory data_model_factory_;
BlockNum start_block_;
BlockNum max_block_;
};
Expand Down Expand Up @@ -488,7 +491,7 @@ int silkworm_execute_blocks_ephemeral(SilkwormHandle handle, MDBX_txn* mdbx_txn,
try {
auto txn = db::RWTxnUnmanaged{mdbx_txn};

db::Buffer state_buffer{txn};
db::Buffer state_buffer{txn, std::make_unique<db::BufferFullDataModel>(db::DataModel{txn, *handle->snapshot_repository})};
state_buffer.set_memory_limit(batch_size);

const size_t max_batch_size{batch_size};
Expand All @@ -497,7 +500,7 @@ int silkworm_execute_blocks_ephemeral(SilkwormHandle handle, MDBX_txn* mdbx_txn,
BlockNum block_number{start_block};
BlockNum batch_start_block_number{start_block};
BlockNum last_block_number = 0;
db::DataModel da_layer{txn};
db::DataModel da_layer{txn, *handle->snapshot_repository};

AnalysisCache analysis_cache{execution::block::BlockExecutor::kDefaultAnalysisCacheSize};
execution::block::BlockExecutor block_executor{*chain_info, write_receipts, write_call_traces, write_change_sets};
Expand Down Expand Up @@ -607,12 +610,23 @@ int silkworm_execute_blocks_perpetual(SilkwormHandle handle, MDBX_env* mdbx_env,
auto txn = db::RWTxnManaged{unmanaged_env};
const auto env_path = unmanaged_env.get_path();

db::Buffer state_buffer{txn};
db::Buffer state_buffer{txn, std::make_unique<db::BufferFullDataModel>(db::DataModel{txn, *handle->snapshot_repository})};
state_buffer.set_memory_limit(batch_size);

BoundedBuffer<std::optional<Block>> block_buffer{kMaxBlockBufferSize};
[[maybe_unused]] auto _ = gsl::finally([&block_buffer] { block_buffer.terminate_and_release_all(); });
BlockProvider block_provider{&block_buffer, unmanaged_env, start_block, max_block};

db::DataModelFactory data_model_factory = [handle](db::ROTxn& tx) {
return db::DataModel{tx, *handle->snapshot_repository};
};

BlockProvider block_provider{
&block_buffer,
db::ROAccess{unmanaged_env},
std::move(data_model_factory),
start_block,
max_block,
};
boost::strict_scoped_thread<boost::interrupt_and_join_if_joinable> block_provider_thread(block_provider);

const size_t max_batch_size{batch_size};
Expand Down
Loading

0 comments on commit 213f140

Please sign in to comment.