Skip to content

Commit

Permalink
Merge "Lazy IO capacity replenishment" from Pavel E
Browse files Browse the repository at this point in the history
"
The rate-limiter-based IO scheduler uses two token-buckets to rate limit
the requests rate. The tokens are put into the second bucket (from where
they are then grabbed for dispatch) by the procedure called "replenisher"
which is run by a steady timer.

This timer generates several troubles: its rate is magically selected,
it runs on a single shard, it generates a noticeable user time when the
reactor is idling.

To fix that the proposal is to make io-queue poller replenish the tokens
from all shards when they need them. Before this change it's worth tuning
the replenishment threshold to be not less than the minimal capacity that
can be claimed from the group.

Verified on i3en instance with the rl-iosched.

tests: unit(dev), manual.rl-iosched(dev)

This set places one more item into the TODO list.

If the disk slows down for some reason the replenisher may start
generating more tokens for the 2nd bucket than there appears on the 1st.
When it happens the replenishment code drops some re-generated tokens
until some future time, thus slowing down its rate. This behavior is
deliberate and was aimed at making the token-buckets adopt to the real
disk speed.

However, this logic may lead to false drops. The tokens appear on the
1st bucket in batches, with the "trendline" being at the expected rate.
However, the replenisher most likely runs between those batches thus
constantly generating more tokens just because those batches are not
"linear enough".

This is what surfaced during verification -- when the replenisher was
switched into on-demand manner it became more "aggressive" thus losing
more tokens. This was partially addressed by the threshold increase, but
some more care is still needed.
"

Fixes #996

* 'br-fair-group-replenish-relax' of https://github.com/xemul/seastar:
  fair_queue: Replenish tokens on grab, not by timer
  fair_queue, io_queue: Configure replenish threshold from minimal IO request
  fair_group: Generalize duration -> capacity conversion
  fair_queue: Tune-up clock_type
  fair_queue: Remove unused _base
  • Loading branch information
avikivity committed Jan 11, 2022
2 parents eccb5c3 + 6e1d719 commit 5025cd4
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 15 deletions.
18 changes: 13 additions & 5 deletions include/seastar/core/fair_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#pragma once

#include <boost/intrusive/slist.hpp>
#include <seastar/core/timer.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/circular_buffer.hh>
Expand All @@ -32,6 +31,7 @@
#include <chrono>
#include <unordered_set>
#include <optional>
#include <cmath>

namespace bi = boost::intrusive;

Expand Down Expand Up @@ -193,7 +193,6 @@ private:
* into more tokens in the bucket.
*/

timer<> _replenisher;
const fair_queue_ticket _cost_capacity;
const capacity_t _replenish_rate;
const capacity_t _replenish_limit;
Expand Down Expand Up @@ -221,6 +220,12 @@ private:

capacity_t fetch_add(fair_group_atomic_rover& rover, capacity_t cap) noexcept;

template <typename Rep, typename Period>
capacity_t accumulated_capacity(const std::chrono::duration<Rep, Period> delta) const noexcept {
auto delta_at_rate = std::chrono::duration_cast<rate_resolution>(delta);
return std::round(_replenish_rate * delta_at_rate.count());
}

public:

/*
Expand All @@ -242,6 +247,8 @@ public:
sstring label = "";
unsigned max_weight;
unsigned max_size;
unsigned min_weight = 0;
unsigned min_size = 0;
unsigned long weight_rate;
unsigned long size_rate;
float rate_factor = 1.0;
Expand All @@ -258,6 +265,7 @@ public:
clock_type::time_point replenished_ts() const noexcept { return _replenished; }
void release_capacity(capacity_t cap) noexcept;
void replenish_capacity(clock_type::time_point now) noexcept;
void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept;

capacity_t capacity_deficiency(capacity_t from) const noexcept;
capacity_t ticket_capacity(fair_queue_ticket ticket) const noexcept;
Expand Down Expand Up @@ -299,19 +307,19 @@ public:
using accumulator_t = double;

private:
using clock_type = std::chrono::steady_clock;
using priority_class_ptr = priority_class_data*;
struct class_compare {
bool operator() (const priority_class_ptr& lhs, const priority_class_ptr & rhs) const noexcept;
};

config _config;
fair_group& _group;
clock_type::time_point _group_replenish;
fair_queue_ticket _resources_executing;
fair_queue_ticket _resources_queued;
unsigned _requests_executing = 0;
unsigned _requests_queued = 0;
using clock_type = std::chrono::steady_clock::time_point;
clock_type _base;
using prioq = std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare>;
prioq _handles;
std::vector<std::unique_ptr<priority_class_data>> _priority_classes;
Expand Down Expand Up @@ -397,7 +405,7 @@ public:
/// Try to execute new requests if there is capacity left in the queue.
void dispatch_requests(std::function<void(fair_queue_entry&)> cb);

clock_type next_pending_aio() const noexcept {
clock_type::time_point next_pending_aio() const noexcept {
if (_pending) {
/*
* We expect the disk to release the ticket within some time,
Expand Down
4 changes: 4 additions & 0 deletions include/seastar/core/io_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include <mutex>
#include <array>

class io_queue_for_tests;

namespace seastar {

class io_priority_class;
Expand Down Expand Up @@ -167,6 +169,8 @@ public:

private:
friend class io_queue;
friend class ::io_queue_for_tests;

const io_queue::config _config;
std::vector<std::unique_ptr<fair_group>> _fgs;

Expand Down
27 changes: 18 additions & 9 deletions src/core/fair_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include <queue>
#include <chrono>
#include <unordered_set>
#include <cmath>

#include "fmt/format.h"
#include "fmt/ostream.h"
Expand Down Expand Up @@ -92,11 +91,10 @@ uint64_t wrapping_difference(const uint64_t& a, const uint64_t& b) noexcept {

fair_group::fair_group(config cfg) noexcept
: _shares_capacity(cfg.max_weight, cfg.max_size)
, _replenisher([this] { replenish_capacity(clock_type::now()); })
, _cost_capacity(cfg.weight_rate / std::chrono::duration_cast<rate_resolution>(std::chrono::seconds(1)).count(), cfg.size_rate / std::chrono::duration_cast<rate_resolution>(std::chrono::seconds(1)).count())
, _replenish_rate(cfg.rate_factor * fixed_point_factor)
, _replenish_limit(_replenish_rate * std::chrono::duration_cast<rate_resolution>(cfg.rate_limit_duration).count())
, _replenish_threshold(1) // FIXME -- too frequest replenish
, _replenish_threshold(std::max((capacity_t)1, ticket_capacity(fair_queue_ticket(cfg.min_weight, cfg.min_size))))
, _replenished(clock_type::now())
, _capacity_tail(0)
, _capacity_head(0)
Expand All @@ -105,7 +103,6 @@ fair_group::fair_group(config cfg) noexcept
assert(!wrapping_difference(_capacity_tail.load(std::memory_order_relaxed), _capacity_head.load(std::memory_order_relaxed)));
seastar_logger.info("Created fair group {}, capacity shares {} rate {}, limit {}, rate {} (factor {}), threshold {}", cfg.label,
_shares_capacity, _cost_capacity, _replenish_limit, _replenish_rate, cfg.rate_factor, _replenish_threshold);
_replenisher.arm_periodic(std::chrono::microseconds(500));
}

auto fair_group::grab_capacity(capacity_t cap) noexcept -> capacity_t {
Expand All @@ -123,11 +120,11 @@ void fair_group::replenish_capacity(clock_type::time_point now) noexcept {
return;
}

auto delta = std::chrono::duration_cast<rate_resolution>(now - ts);
capacity_t extra = std::round(_replenish_rate * delta.count());
auto delta = now - ts;
auto extra = accumulated_capacity(now - ts);

if (extra >= _replenish_threshold) {
if (!_replenished.compare_exchange_weak(ts, ts + std::chrono::duration_cast<std::chrono::nanoseconds>(delta))) {
if (!_replenished.compare_exchange_weak(ts, ts + delta)) {
return; // next time or another shard
}

Expand All @@ -136,6 +133,16 @@ void fair_group::replenish_capacity(clock_type::time_point now) noexcept {
}
}

void fair_group::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept {
auto now = clock_type::now();
auto extra = accumulated_capacity(now - local_ts);

if (extra >= _replenish_threshold) {
local_ts = now;
replenish_capacity(now);
}
}

auto fair_group::capacity_deficiency(capacity_t from) const noexcept -> capacity_t {
return wrapping_difference(from, _capacity_head.load(std::memory_order_relaxed));
}
Expand Down Expand Up @@ -171,18 +178,18 @@ bool fair_queue::class_compare::operator() (const priority_class_ptr& lhs, const
fair_queue::fair_queue(fair_group& group, config cfg)
: _config(std::move(cfg))
, _group(group)
, _base(std::chrono::steady_clock::now())
, _group_replenish(clock_type::now())
{
}

fair_queue::fair_queue(fair_queue&& other)
: _config(std::move(other._config))
, _group(other._group)
, _group_replenish(std::move(other._group_replenish))
, _resources_executing(std::exchange(other._resources_executing, fair_queue_ticket{}))
, _resources_queued(std::exchange(other._resources_queued, fair_queue_ticket{}))
, _requests_executing(std::exchange(other._requests_executing, 0))
, _requests_queued(std::exchange(other._requests_queued, 0))
, _base(other._base)
, _handles(std::move(other._handles))
, _priority_classes(std::move(other._priority_classes))
{
Expand Down Expand Up @@ -221,6 +228,8 @@ void fair_queue::pop_priority_class(priority_class_data& pc) {
}

bool fair_queue::grab_pending_capacity(const fair_queue_entry& ent) noexcept {
_group.maybe_replenish_capacity(_group_replenish);

if (_group.capacity_deficiency(_pending->head)) {
return false;
}
Expand Down
2 changes: 2 additions & 0 deletions src/core/io_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@ fair_group::config io_group::make_fair_group_config(const io_queue::config& qcfg
cfg.label = fmt::format("io-queue-{}", qcfg.devid);
cfg.max_weight = max_req_count;
cfg.max_size = qcfg.max_blocks_count;
cfg.min_weight = std::min(io_queue::read_request_base_count, qcfg.disk_req_write_to_read_multiplier);
cfg.min_size = std::min(io_queue::read_request_base_count, qcfg.disk_blocks_write_to_read_multiplier);
cfg.weight_rate = qcfg.req_count_rate;
cfg.size_rate = qcfg.blocks_count_rate;
cfg.rate_factor = qcfg.rate_factor;
Expand Down
12 changes: 11 additions & 1 deletion tests/unit/io_queue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,22 @@ struct io_queue_for_tests {
io_group_ptr group;
internal::io_sink sink;
io_queue queue;
timer<> kicker;

io_queue_for_tests()
: group(std::make_shared<io_group>(io_queue::config{0}))
, sink()
, queue(group, sink)
{}
, kicker([this] { kick(); })
{
kicker.arm_periodic(std::chrono::microseconds(500));
}

void kick() {
for (auto&& fg : group->_fgs) {
fg->replenish_capacity(std::chrono::steady_clock::now());
}
}
};

SEASTAR_THREAD_TEST_CASE(test_basic_flow) {
Expand Down

0 comments on commit 5025cd4

Please sign in to comment.