Skip to content

Commit

Permalink
thread safe scheduler (#2055)
Browse files Browse the repository at this point in the history
Signed-off-by: turuslan <turuslan.devbox@gmail.com>
  • Loading branch information
turuslan authored Apr 23, 2024
1 parent dde7ca1 commit 2160c7d
Show file tree
Hide file tree
Showing 13 changed files with 57 additions and 58 deletions.
2 changes: 1 addition & 1 deletion cmake/Hunter/config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ hunter_config(

hunter_config(
libp2p
VERSION 0.1.19
VERSION 0.1.20
KEEP_PACKAGE_SOURCES
)

Expand Down
4 changes: 2 additions & 2 deletions cmake/Hunter/hunter-gate-url.cmake
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
HunterGate(
URL https://github.com/qdrvm/hunter/archive/refs/tags/v0.25.3-qdrvm7.zip
SHA1 be5869134ef7448fe2420d60dbb9706596b1b8bd
URL https://github.com/qdrvm/hunter/archive/refs/tags/v0.25.3-qdrvm8.zip
SHA1 dc0af42b358dc0bcab304a455e80681c12d52e0f
LOCAL
)
44 changes: 23 additions & 21 deletions core/consensus/grandpa/impl/grandpa_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,24 +175,7 @@ namespace kagome::consensus::grandpa {
}

// Timer to send neighbor message if round does not change long time (1 min)
fallback_timer_handle_ = scheduler_->scheduleWithHandle(
[wp{weak_from_this()}] {
auto self = wp.lock();
if (not self) {
return;
}
BOOST_ASSERT_MSG(self->current_round_,
"Current round must be defiled anytime after start");
auto round =
std::dynamic_pointer_cast<VotingRoundImpl>(self->current_round_);
if (round) {
round->sendNeighborMessage();
}

std::ignore =
self->fallback_timer_handle_.reschedule(std::chrono::minutes(1));
},
std::chrono::minutes(1));
setTimerFallback();

tryExecuteNextRound(current_round_);

Expand All @@ -206,7 +189,7 @@ namespace kagome::consensus::grandpa {
}

void GrandpaImpl::stop() {
fallback_timer_handle_.cancel();
fallback_timer_handle_.reset();
}

std::shared_ptr<VotingRound> GrandpaImpl::makeInitialRound(
Expand Down Expand Up @@ -387,7 +370,7 @@ namespace kagome::consensus::grandpa {
BOOST_ASSERT(res.value() != nullptr);
current_round_ = std::move(res.value());

std::ignore = fallback_timer_handle_.reschedule(std::chrono::minutes(1));
setTimerFallback();

// Truncate chain of rounds
size_t i = 0;
Expand Down Expand Up @@ -743,7 +726,7 @@ namespace kagome::consensus::grandpa {

::libp2p::common::FinalAction cleanup([&] {
if (need_cleanup_when_exiting_scope) {
catchup_request_timer_handle_.cancel();
catchup_request_timer_handle_.reset();
pending_catchup_request_.reset();
}
});
Expand Down Expand Up @@ -1472,4 +1455,23 @@ namespace kagome::consensus::grandpa {
}
update.update();
}

void GrandpaImpl::setTimerFallback() {
fallback_timer_handle_ = scheduler_->scheduleWithHandle(
[weak_self{weak_from_this()}] {
auto self = weak_self.lock();
if (not self) {
return;
}
BOOST_ASSERT_MSG(self->current_round_,
"Current round must be defiled anytime after start");
auto round =
std::dynamic_pointer_cast<VotingRoundImpl>(self->current_round_);
if (round) {
round->sendNeighborMessage();
}
self->setTimerFallback();
},
std::chrono::minutes(1));
}
} // namespace kagome::consensus::grandpa
2 changes: 2 additions & 0 deletions core/consensus/grandpa/impl/grandpa_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ namespace kagome::consensus::grandpa {
void saveCachedVotes();
void applyCachedVotes(VotingRound &round);

void setTimerFallback();

log::Logger logger_ = log::createLogger("Grandpa", "grandpa");

const size_t kVotesCacheSize = 5;
Expand Down
10 changes: 5 additions & 5 deletions core/consensus/grandpa/impl/voting_round_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ namespace kagome::consensus::grandpa {
}
BOOST_ASSERT(stage_ == Stage::PREVOTE_RUNS);

stage_timer_handle_.cancel();
stage_timer_handle_.reset();
on_complete_handler_ = nullptr;

stage_ = Stage::END_PREVOTE;
Expand Down Expand Up @@ -356,7 +356,7 @@ namespace kagome::consensus::grandpa {
BOOST_ASSERT(stage_ == Stage::PRECOMMIT_RUNS
|| stage_ == Stage::PRECOMMIT_WAITS_FOR_PREVOTES);

stage_timer_handle_.cancel();
stage_timer_handle_.reset();

// https://github.com/paritytech/finality-grandpa/blob/8c45a664c05657f0c71057158d3ba555ba7d20de/src/voter/voting_round.rs#L630-L633
if (not prevote_ghost_) {
Expand Down Expand Up @@ -438,7 +438,7 @@ namespace kagome::consensus::grandpa {
}
BOOST_ASSERT(stage_ == Stage::WAITING_RUNS);

stage_timer_handle_.cancel();
stage_timer_handle_.reset();
on_complete_handler_ = nullptr;

// Final attempt to finalize round what should be success
Expand All @@ -452,8 +452,8 @@ namespace kagome::consensus::grandpa {
if (stage_ != Stage::COMPLETED) {
SL_DEBUG(logger_, "Round #{}: End round", round_number_);
on_complete_handler_ = nullptr;
stage_timer_handle_.cancel();
pending_timer_handle_.cancel();
stage_timer_handle_.reset();
pending_timer_handle_.reset();
stage_ = Stage::COMPLETED;
}
}
Expand Down
1 change: 0 additions & 1 deletion core/dispute_coordinator/impl/dispute_coordinator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2152,7 +2152,6 @@ namespace kagome::dispute {

void DisputeCoordinatorImpl::process_portion_incoming_disputes() {
if (rate_limit_timer_) {
rate_limit_timer_->cancel();
rate_limit_timer_.reset();
}

Expand Down
2 changes: 1 addition & 1 deletion core/network/impl/peer_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ namespace kagome::network {
+ app_config_.outPeers();
const auto peer_ttl = app_config_.peeringConfig().peerTtl;

align_timer_.cancel();
align_timer_.reset();

clearClosedPingingConnections();

Expand Down
13 changes: 10 additions & 3 deletions core/network/impl/reputation_repository_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ namespace kagome::network {
void ReputationRepositoryImpl::start() {
main_thread_->execute([weak{weak_from_this()}] {
if (auto self = weak.lock()) {
self->tick_handler_ =
self->scheduler_->scheduleWithHandle([self] { self->tick(); }, 1s);
self->tick();
}
});
}
Expand Down Expand Up @@ -137,7 +136,15 @@ namespace kagome::network {
reputation_table_.erase(cit);
}
}
std::ignore = tick_handler_.reschedule(1s);
tick_handler_ = scheduler_->scheduleWithHandle(
[weak_self{weak_from_this()}] {
auto self = weak_self.lock();
if (not self) {
return;
}
self->tick();
},
1s);
}

} // namespace kagome::network
8 changes: 4 additions & 4 deletions core/telemetry/impl/service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ namespace kagome::telemetry {

void TelemetryServiceImpl::stop() {
shutdown_requested_ = true;
frequent_timer_.cancel();
delayed_timer_.cancel();
frequent_timer_.reset();
delayed_timer_.reset();
for (auto &connection : connections_) {
connection->shutdown();
}
Expand Down Expand Up @@ -180,7 +180,7 @@ namespace kagome::telemetry {
}

void TelemetryServiceImpl::frequentNotificationsRoutine() {
frequent_timer_.cancel();
frequent_timer_.reset();
if (shutdown_requested_) {
return;
}
Expand Down Expand Up @@ -216,7 +216,7 @@ namespace kagome::telemetry {
}

void TelemetryServiceImpl::delayedNotificationsRoutine() {
delayed_timer_.cancel();
delayed_timer_.reset();
if (shutdown_requested_) {
return;
}
Expand Down
11 changes: 0 additions & 11 deletions test/core/consensus/beefy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ using testing::_;
using testing::Return;

struct Timer : libp2p::basic::Scheduler {
void pulse(std::chrono::milliseconds current_clock) noexcept override {
abort();
}
std::chrono::milliseconds now() const noexcept override {
abort();
}
Expand All @@ -83,14 +80,6 @@ struct Timer : libp2p::basic::Scheduler {
cb_.emplace(std::move(cb));
return Handle{};
}
void cancel(Handle::Ticket ticket) noexcept override {
abort();
}
outcome::result<Handle::Ticket> reschedule(
Handle::Ticket ticket,
std::chrono::milliseconds delay_from_now) noexcept override {
abort();
}

void call() {
if (cb_) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ class VotingRoundTest : public testing::Test,
vote_graph_ = std::make_shared<VoteGraphImpl>(base, config.voters, env_);

scheduler_ = std::make_shared<libp2p::basic::SchedulerMock>();
EXPECT_CALL(*scheduler_, scheduleImplMockCall(_, _, _)).Times(AnyNumber());
EXPECT_CALL(*scheduler_, nowMockCall()).Times(AnyNumber());
EXPECT_CALL(*scheduler_, scheduleImpl(_, _, _)).Times(AnyNumber());
EXPECT_CALL(*scheduler_, now()).Times(AnyNumber());

previous_round_ = std::make_shared<VotingRoundMock>();
ON_CALL(*previous_round_, lastFinalizedBlock())
Expand Down
12 changes: 6 additions & 6 deletions test/core/consensus/timeline/timeline_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ TEST_F(TimelineTest, SingleValidator) {
.WillRepeatedly(Return(ValidatorStatus::SingleValidator));
EXPECT_CALL(*production_consensus, processSlot(_, best_block)).Times(0);
// - start to wait for end of current slot
EXPECT_CALL(*scheduler, scheduleImplMockCall(_, _, false))
EXPECT_CALL(*scheduler, scheduleImpl(_, _, false))
.WillOnce(WithArg<0>(Invoke([&](auto cb) {
on_run_slot = std::move(cb);
return SchedulerMock::Handle{};
Expand All @@ -285,7 +285,7 @@ TEST_F(TimelineTest, SingleValidator) {
EXPECT_CALL(*production_consensus, processSlot(current_slot, best_block))
.WillOnce(Return(outcome::success()));
// - start to wait for end of current slot
EXPECT_CALL(*scheduler, scheduleImplMockCall(_, _, false))
EXPECT_CALL(*scheduler, scheduleImpl(_, _, false))
.WillOnce(WithArg<0>(
Invoke([&](auto cb) { return SchedulerMock::Handle{}; })));

Expand Down Expand Up @@ -324,7 +324,7 @@ TEST_F(TimelineTest, Validator) {
// - don't process slot, because node is not synchronized
EXPECT_CALL(*production_consensus, processSlot(_, best_block)).Times(0);
// - don't wait time to run slot, because node is not synchronized
EXPECT_CALL(*scheduler, scheduleImplMockCall(_, _, _)).Times(0);
EXPECT_CALL(*scheduler, scheduleImpl(_, _, _)).Times(0);

timeline->start();

Expand All @@ -349,7 +349,7 @@ TEST_F(TimelineTest, Validator) {
// - process slot won't start, because slot is not changed
EXPECT_CALL(*production_consensus, processSlot(_, _)).Times(0);
// - start to wait for end of current slot
EXPECT_CALL(*scheduler, scheduleImplMockCall(_, _, false))
EXPECT_CALL(*scheduler, scheduleImpl(_, _, false))
.WillOnce(WithArg<0>(Invoke([&](auto cb) {
on_run_slot_2 = std::move(cb);
return SchedulerMock::Handle{};
Expand Down Expand Up @@ -380,7 +380,7 @@ TEST_F(TimelineTest, Validator) {
EXPECT_CALL(*production_consensus, processSlot(current_slot, best_block))
.WillOnce(Return(SlotLeadershipError::NO_SLOT_LEADER));
// - start to wait for end of current slot
EXPECT_CALL(*scheduler, scheduleImplMockCall(_, _, false))
EXPECT_CALL(*scheduler, scheduleImpl(_, _, false))
.WillOnce(WithArg<0>(Invoke([&](auto cb) {
on_run_slot_3 = std::move(cb);
return SchedulerMock::Handle{};
Expand All @@ -406,7 +406,7 @@ TEST_F(TimelineTest, Validator) {
EXPECT_CALL(*production_consensus, processSlot(current_slot, best_block))
.WillOnce(Return(outcome::success()));
// - start to wait for end of current slot
EXPECT_CALL(*scheduler, scheduleImplMockCall(_, _, false))
EXPECT_CALL(*scheduler, scheduleImpl(_, _, false))
.WillOnce(WithArg<0>(Invoke([&](auto cb) {
on_run_slot_3 = std::move(cb);
return SchedulerMock::Handle{};
Expand Down
2 changes: 1 addition & 1 deletion test/core/network/synchronizer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class SynchronizerTest
EXPECT_CALL(*router, getSyncProtocol())
.WillRepeatedly(Return(sync_protocol));

EXPECT_CALL(*scheduler, scheduleImplMockCall(_, _, _)).Times(AnyNumber());
EXPECT_CALL(*scheduler, scheduleImpl(_, _, _)).Times(AnyNumber());

EXPECT_CALL(app_config, syncMethod())
.WillOnce(Return(application::SyncMethod::Full));
Expand Down

0 comments on commit 2160c7d

Please sign in to comment.