Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace charge() by fee_.update() in OnMessage functions #5269

Merged
merged 12 commits into from
Feb 13, 2025
33 changes: 18 additions & 15 deletions src/xrpld/overlay/detail/PeerImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
#include <xrpl/basics/random.h>
#include <xrpl/basics/safe_cast.h>
#include <xrpl/beast/core/LexicalCast.h>
// #include <xrpl/beast/core/SemanticVersion.h>
#include <xrpl/protocol/digest.h>

#include <boost/algorithm/string/predicate.hpp>
Expand Down Expand Up @@ -1111,7 +1110,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMCluster> const& m)
// VFALCO NOTE I think we should drop the peer immediately
if (!cluster())
{
fee_.fee = Resource::feeUselessData;
fee_.update(Resource::feeUselessData, "unknown cluster");
return;
}

Expand Down Expand Up @@ -1189,7 +1188,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
// implication for the protocol.
if (m->endpoints_v2().size() >= 1024)
{
charge(Resource::feeInvalidData, "endpoints too large");
fee_.update(Resource::feeUselessData, "endpoints too large");
return;
}

Expand All @@ -1204,7 +1203,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
{
JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {"
<< tm.endpoint() << "}";
charge(Resource::feeInvalidData, "endpoints malformed");
fee_.update(Resource::feeInvalidData, "endpoints malformed");
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ximinez This replacement is in a for-loop. Instead of accumulated charges, now only a single invalid fee will be charged.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might fix some of the double-charges that we were detecting!

Copy link
Collaborator

@ximinez ximinez Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ximinez This replacement is in a for-loop. Instead of accumulated charges, now only a single invalid fee will be charged.

I would change this one back to a charge, and leave a comment explaining that multiple instances of invalid data should be charged / punished multiple times.

Another option is to change the continue to a break (or return, but break is better, because there may have been some good ones before the bad one(s)) with the idea that if any part of the message is corrupted, the whole message is probably bad, so don't even bother.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to charge the fee for each malformed endpoint, while still allowing the valid endpoints to be processed.

@ximinez in which situations can an endpoint become invalid? If a peer at some endpoint disconnects and thus the message cannot be forwarded to it, that would seem like an innocent problem and it would be fine to charge the fee once as a warning / informational notice. If the message contains many invalid endpoints then we'd now charge more (as was the case before), which eventually can lead to being disconnected for bad behavior.

However, if the idea is that when any endpoint is invalid it means that the message is corrupted, then we should just return early and penalize the peer heavily, instead of still processing the first few valid endpoints (while ignoring any other valid endpoints after the invalid one).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding here is that "malformed" fee is charged if the string value fails to parse into an endpoint object (IP & port?). Whether the IP is available is not checked. So the charge doesn't fire if the source gives us "1.2.3.4:51234", but there's no rippled running there, but it will fire if the source gives us "your:mama".

continue;
}

Expand Down Expand Up @@ -1340,7 +1339,7 @@ void
PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
{
auto badData = [&](std::string const& msg) {
charge(Resource::feeInvalidData, "get_ledger " + msg);
fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
};
auto const itype{m->itype()};
Expand Down Expand Up @@ -1431,7 +1430,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
if (!ledgerReplayEnabled_)
{
charge(Resource::feeMalformedRequest, "proof_path_request disabled");
fee_.update(
Resource::feeMalformedRequest, "proof_path_request disabled");
return;
}

Expand Down Expand Up @@ -1468,13 +1468,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m)
{
if (!ledgerReplayEnabled_)
{
charge(Resource::feeMalformedRequest, "proof_path_response disabled");
fee_.update(
Resource::feeMalformedRequest, "proof_path_response disabled");
return;
}

if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
{
charge(Resource::feeInvalidData, "proof_path_response");
fee_.update(Resource::feeInvalidData, "proof_path_response");
}
}

Expand All @@ -1484,7 +1485,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
if (!ledgerReplayEnabled_)
{
charge(Resource::feeMalformedRequest, "replay_delta_request disabled");
fee_.update(
Resource::feeMalformedRequest, "replay_delta_request disabled");
return;
}

Expand Down Expand Up @@ -1521,13 +1523,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
{
if (!ledgerReplayEnabled_)
{
charge(Resource::feeMalformedRequest, "replay_delta_response disabled");
fee_.update(
Resource::feeMalformedRequest, "replay_delta_response disabled");
return;
}

if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
{
charge(Resource::feeInvalidData, "replay_delta_response");
fee_.update(Resource::feeInvalidData, "replay_delta_response");
}
}

Expand Down Expand Up @@ -2628,22 +2631,22 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)

if (!m->has_validatorpubkey())
{
charge(Resource::feeInvalidData, "squelch no pubkey");
fee_.update(Resource::feeInvalidData, "squelch no pubkey");
return;
}
auto validator = m->validatorpubkey();
auto const slice{makeSlice(validator)};
if (!publicKeyType(slice))
{
charge(Resource::feeInvalidData, "squelch bad pubkey");
fee_.update(Resource::feeInvalidData, "squelch bad pubkey");
return;
}
PublicKey key(slice);

// Ignore non-validator squelch
if (!app_.validators().listed(key))
{
charge(Resource::feeInvalidData, "squelch non-validator");
fee_.update(Resource::feeInvalidData, "squelch non-validator");
JLOG(p_journal_.debug())
<< "onMessage: TMSquelch discarding non-validator squelch "
<< slice;
Expand All @@ -2663,7 +2666,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
if (!m->squelch())
squelch_.removeSquelch(key);
else if (!squelch_.addSquelch(key, std::chrono::seconds{duration}))
charge(Resource::feeInvalidData, "squelch duration");
fee_.update(Resource::feeInvalidData, "squelch duration");

JLOG(p_journal_.debug())
<< "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
Expand Down
Loading