Skip to content

Commit

Permalink
fix: Replace charge() by fee_.update() in OnMessage functions (#5269)
Browse files Browse the repository at this point in the history
In PeerImpl.cpp, if the function is a message handler (onMessage) or called directly from a message handler, then it should use fee_, since when the handler returns (OnMessageEnd) then the charge function is called. If the function is not a message handler, such as a job queue item, it should remain charge.
  • Loading branch information
bthomee authored and q73zhao committed Feb 24, 2025
1 parent 87b1318 commit 4a1ef3d
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 21 deletions.
3 changes: 3 additions & 0 deletions include/xrpl/resource/Charge.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class Charge
std::strong_ordering
operator<=>(Charge const&) const;

Charge
operator*(value_type m) const;

private:
value_type m_cost;
std::string m_label;
Expand Down
6 changes: 6 additions & 0 deletions src/libxrpl/resource/Charge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,11 @@ Charge::operator<=>(Charge const& c) const
return m_cost <=> c.m_cost;
}

Charge
Charge::operator*(value_type m) const
{
return Charge(m_cost * m, m_label);
}

} // namespace Resource
} // namespace ripple
53 changes: 32 additions & 21 deletions src/xrpld/overlay/detail/PeerImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,11 @@
#include <xrpld/overlay/Cluster.h>
#include <xrpld/overlay/detail/PeerImp.h>
#include <xrpld/overlay/detail/Tuning.h>
#include <xrpld/overlay/predicates.h>
#include <xrpld/perflog/PerfLog.h>
#include <xrpl/basics/UptimeClock.h>
#include <xrpl/basics/base64.h>
#include <xrpl/basics/random.h>
#include <xrpl/basics/safe_cast.h>
#include <xrpl/beast/core/LexicalCast.h>
// #include <xrpl/beast/core/SemanticVersion.h>
#include <xrpl/protocol/digest.h>

#include <boost/algorithm/string/predicate.hpp>
Expand Down Expand Up @@ -1111,7 +1108,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMCluster> const& m)
// VFALCO NOTE I think we should drop the peer immediately
if (!cluster())
{
fee_.fee = Resource::feeUselessData;
fee_.update(Resource::feeUselessData, "unknown cluster");
return;
}

Expand Down Expand Up @@ -1189,13 +1186,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
// implication for the protocol.
if (m->endpoints_v2().size() >= 1024)
{
charge(Resource::feeInvalidData, "endpoints too large");
fee_.update(Resource::feeUselessData, "endpoints too large");
return;
}

std::vector<PeerFinder::Endpoint> endpoints;
endpoints.reserve(m->endpoints_v2().size());

auto malformed = 0;
for (auto const& tm : m->endpoints_v2())
{
auto result = beast::IP::Endpoint::from_string_checked(tm.endpoint());
Expand All @@ -1204,7 +1202,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
{
JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {"
<< tm.endpoint() << "}";
charge(Resource::feeInvalidData, "endpoints malformed");
malformed++;
continue;
}

Expand All @@ -1220,6 +1218,15 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
endpoints.emplace_back(*result, tm.hops());
}

// Charge the peer for each malformed endpoint. As there still may be
// multiple valid endpoints we don't return early.
if (malformed > 0)
{
fee_.update(
Resource::feeInvalidData * malformed,
std::to_string(malformed) + " malformed endpoints");
}

if (!endpoints.empty())
overlay_.peerFinder().on_endpoints(slot_, endpoints);
}
Expand Down Expand Up @@ -1341,7 +1348,7 @@ void
PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
{
auto badData = [&](std::string const& msg) {
charge(Resource::feeInvalidData, "get_ledger " + msg);
fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
};
auto const itype{m->itype()};
Expand Down Expand Up @@ -1432,7 +1439,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
if (!ledgerReplayEnabled_)
{
charge(Resource::feeMalformedRequest, "proof_path_request disabled");
fee_.update(
Resource::feeMalformedRequest, "proof_path_request disabled");
return;
}

Expand Down Expand Up @@ -1469,13 +1477,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m)
{
if (!ledgerReplayEnabled_)
{
charge(Resource::feeMalformedRequest, "proof_path_response disabled");
fee_.update(
Resource::feeMalformedRequest, "proof_path_response disabled");
return;
}

if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
{
charge(Resource::feeInvalidData, "proof_path_response");
fee_.update(Resource::feeInvalidData, "proof_path_response");
}
}

Expand All @@ -1485,7 +1494,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
if (!ledgerReplayEnabled_)
{
charge(Resource::feeMalformedRequest, "replay_delta_request disabled");
fee_.update(
Resource::feeMalformedRequest, "replay_delta_request disabled");
return;
}

Expand Down Expand Up @@ -1522,13 +1532,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
{
if (!ledgerReplayEnabled_)
{
charge(Resource::feeMalformedRequest, "replay_delta_response disabled");
fee_.update(
Resource::feeMalformedRequest, "replay_delta_response disabled");
return;
}

if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
{
charge(Resource::feeInvalidData, "replay_delta_response");
fee_.update(Resource::feeInvalidData, "replay_delta_response");
}
}

Expand Down Expand Up @@ -2409,10 +2420,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
return;
}

fee_.update(
Resource::feeModerateBurdenPeer,
" received a get object by hash request");

protocol::TMGetObjectByHash reply;

reply.set_query(false);
Expand All @@ -2433,6 +2440,10 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
reply.set_ledgerhash(packet.ledgerhash());
}

fee_.update(
Resource::feeModerateBurdenPeer,
" received a get object by hash request");

// This is a very minimal implementation
for (int i = 0; i < packet.objects_size(); ++i)
{
Expand Down Expand Up @@ -2629,22 +2640,22 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)

if (!m->has_validatorpubkey())
{
charge(Resource::feeInvalidData, "squelch no pubkey");
fee_.update(Resource::feeInvalidData, "squelch no pubkey");
return;
}
auto validator = m->validatorpubkey();
auto const slice{makeSlice(validator)};
if (!publicKeyType(slice))
{
charge(Resource::feeInvalidData, "squelch bad pubkey");
fee_.update(Resource::feeInvalidData, "squelch bad pubkey");
return;
}
PublicKey key(slice);

// Ignore non-validator squelch
if (!app_.validators().listed(key))
{
charge(Resource::feeInvalidData, "squelch non-validator");
fee_.update(Resource::feeInvalidData, "squelch non-validator");
JLOG(p_journal_.debug())
<< "onMessage: TMSquelch discarding non-validator squelch "
<< slice;
Expand All @@ -2664,7 +2675,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
if (!m->squelch())
squelch_.removeSquelch(key);
else if (!squelch_.addSquelch(key, std::chrono::seconds{duration}))
charge(Resource::feeInvalidData, "squelch duration");
fee_.update(Resource::feeInvalidData, "squelch duration");

JLOG(p_journal_.debug())
<< "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
Expand Down

0 comments on commit 4a1ef3d

Please sign in to comment.