Skip to content

Commit

Permalink
fair_queue: Replenish tokens on grab, not by timer
Browse files Browse the repository at this point in the history
Right now tokens replenisher runs from steady-clock timer of a group.
This generates several troubles.

First is that the replenishing rate is hard-coded to be 500 usec and
it has no justification other than "seems to work decent enough with
default latency goal". Next, when the reactor is idling it's woken up
by this timer frequent enough to generate a noticeable user time. And
finally, the timer sits on a group and is thus run by a single shard
thus making the whole group depending on this shard stallness.

The proposed fix is to make each shard replenishing the capacity when
it really needs it. Benefits of this approach are:

- no magic 500us constant for replenish rate
- no dependency on a single shard rate (replenisher is shard-safe)
- no user-time generation when idling

fixes: scylladb#996

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
  • Loading branch information
xemul committed Jan 11, 2022
1 parent 7f33e27 commit 00174af
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 5 deletions.
4 changes: 2 additions & 2 deletions include/seastar/core/fair_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#pragma once

#include <boost/intrusive/slist.hpp>
#include <seastar/core/timer.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/circular_buffer.hh>
Expand Down Expand Up @@ -194,7 +193,6 @@ private:
* into more tokens in the bucket.
*/

timer<> _replenisher;
const fair_queue_ticket _cost_capacity;
const capacity_t _replenish_rate;
const capacity_t _replenish_limit;
Expand Down Expand Up @@ -267,6 +265,7 @@ public:
clock_type::time_point replenished_ts() const noexcept { return _replenished; }
void release_capacity(capacity_t cap) noexcept;
void replenish_capacity(clock_type::time_point now) noexcept;
void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept;

capacity_t capacity_deficiency(capacity_t from) const noexcept;
capacity_t ticket_capacity(fair_queue_ticket ticket) const noexcept;
Expand Down Expand Up @@ -316,6 +315,7 @@ private:

config _config;
fair_group& _group;
clock_type::time_point _group_replenish;
fair_queue_ticket _resources_executing;
fair_queue_ticket _resources_queued;
unsigned _requests_executing = 0;
Expand Down
4 changes: 4 additions & 0 deletions include/seastar/core/io_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include <mutex>
#include <array>

class io_queue_for_tests;

namespace seastar {

class io_priority_class;
Expand Down Expand Up @@ -167,6 +169,8 @@ public:

private:
friend class io_queue;
friend class ::io_queue_for_tests;

const io_queue::config _config;
std::vector<std::unique_ptr<fair_group>> _fgs;

Expand Down
16 changes: 14 additions & 2 deletions src/core/fair_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ uint64_t wrapping_difference(const uint64_t& a, const uint64_t& b) noexcept {

fair_group::fair_group(config cfg) noexcept
: _shares_capacity(cfg.max_weight, cfg.max_size)
, _replenisher([this] { replenish_capacity(clock_type::now()); })
, _cost_capacity(cfg.weight_rate / std::chrono::duration_cast<rate_resolution>(std::chrono::seconds(1)).count(), cfg.size_rate / std::chrono::duration_cast<rate_resolution>(std::chrono::seconds(1)).count())
, _replenish_rate(cfg.rate_factor * fixed_point_factor)
, _replenish_limit(_replenish_rate * std::chrono::duration_cast<rate_resolution>(cfg.rate_limit_duration).count())
Expand All @@ -104,7 +103,6 @@ fair_group::fair_group(config cfg) noexcept
assert(!wrapping_difference(_capacity_tail.load(std::memory_order_relaxed), _capacity_head.load(std::memory_order_relaxed)));
seastar_logger.info("Created fair group {}, capacity shares {} rate {}, limit {}, rate {} (factor {}), threshold {}", cfg.label,
_shares_capacity, _cost_capacity, _replenish_limit, _replenish_rate, cfg.rate_factor, _replenish_threshold);
_replenisher.arm_periodic(std::chrono::microseconds(500));
}

auto fair_group::grab_capacity(capacity_t cap) noexcept -> capacity_t {
Expand Down Expand Up @@ -135,6 +133,16 @@ void fair_group::replenish_capacity(clock_type::time_point now) noexcept {
}
}

void fair_group::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept {
auto now = clock_type::now();
auto extra = accumulated_capacity(now - local_ts);

if (extra >= _replenish_threshold) {
local_ts = now;
replenish_capacity(now);
}
}

auto fair_group::capacity_deficiency(capacity_t from) const noexcept -> capacity_t {
return wrapping_difference(from, _capacity_head.load(std::memory_order_relaxed));
}
Expand Down Expand Up @@ -170,12 +178,14 @@ bool fair_queue::class_compare::operator() (const priority_class_ptr& lhs, const
fair_queue::fair_queue(fair_group& group, config cfg)
: _config(std::move(cfg))
, _group(group)
, _group_replenish(clock_type::now())
{
}

fair_queue::fair_queue(fair_queue&& other)
: _config(std::move(other._config))
, _group(other._group)
, _group_replenish(std::move(other._group_replenish))
, _resources_executing(std::exchange(other._resources_executing, fair_queue_ticket{}))
, _resources_queued(std::exchange(other._resources_queued, fair_queue_ticket{}))
, _requests_executing(std::exchange(other._requests_executing, 0))
Expand Down Expand Up @@ -218,6 +228,8 @@ void fair_queue::pop_priority_class(priority_class_data& pc) {
}

bool fair_queue::grab_pending_capacity(const fair_queue_entry& ent) noexcept {
_group.maybe_replenish_capacity(_group_replenish);

if (_group.capacity_deficiency(_pending->head)) {
return false;
}
Expand Down
12 changes: 11 additions & 1 deletion tests/unit/io_queue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,22 @@ struct io_queue_for_tests {
io_group_ptr group;
internal::io_sink sink;
io_queue queue;
timer<> kicker;

io_queue_for_tests()
: group(std::make_shared<io_group>(io_queue::config{0}))
, sink()
, queue(group, sink)
{}
, kicker([this] { kick(); })
{
kicker.arm_periodic(std::chrono::microseconds(500));
}

void kick() {
for (auto&& fg : group->_fgs) {
fg->replenish_capacity(std::chrono::steady_clock::now());
}
}
};

SEASTAR_THREAD_TEST_CASE(test_basic_flow) {
Expand Down

0 comments on commit 00174af

Please sign in to comment.