Skip to content

Commit

Permalink
replace compile reputation flag with config flag (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
s-tikhomirov committed Mar 5, 2025
1 parent 0e66d8e commit e8e36a3
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 36 deletions.
7 changes: 5 additions & 2 deletions tests/waku_lightpush/lightpush_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ proc newTestWakuLightpushNode*(

return proto

proc newTestWakuLightpushClient*(switch: Switch): WakuLightPushClient =
proc newTestWakuLightpushClient*(
switch: Switch,
reputationEnabled: bool = false
): WakuLightPushClient =
let peerManager = PeerManager.new(switch)
let reputationManager =
if defined(reputation):
if reputationEnabled:
some(ReputationManager.new())
else:
none(ReputationManager)
Expand Down
23 changes: 15 additions & 8 deletions tests/waku_lightpush/test_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ suite "Waku Lightpush Client":
server = await newTestWakuLightpushNode(serverSwitch, handler)
serverFailsLightpush =
await newTestWakuLightpushNode(serverSwitchFailsLightpush, handlerFailsLightpush)
client = newTestWakuLightpushClient(clientSwitch)
## FIXME: how to pass reputationEnabled from config to here? should we?
client = newTestWakuLightpushClient(clientSwitch, reputationEnabled = true)

await allFutures(
serverSwitch.start(), serverSwitchFailsLightpush.start(), clientSwitch.start()
Expand Down Expand Up @@ -330,8 +331,8 @@ suite "Waku Lightpush Client":
# Then the response is positive
assertResultOk publishResponse

when defined(reputation):
check client.reputationManager.getReputation(serverRemotePeerInfo.peerId) ==
if client.reputationManager.isSome:
check client.reputationManager.get().getReputation(serverRemotePeerInfo.peerId) ==
some(true)

# TODO: Improve: Add more negative responses variations
Expand All @@ -351,8 +352,8 @@ suite "Waku Lightpush Client":
# Then the response is negative
check not publishResponse.isOk()

when defined(reputation):
check client.reputationManager.getReputation(
if client.reputationManager.isSome:
check client.reputationManager.get().getReputation(
serverRemotePeerInfoFailsLightpush.peerId
) == some(false)

Expand Down Expand Up @@ -386,6 +387,9 @@ suite "Waku Lightpush Client":

check not publishResponse1.isOk()

if client.reputationManager.isSome:
client.reputationManager.get().setReputation(serverRemotePeerInfoFailsLightpush.peerId, some(false))

# add a peer that supports the Lightpush protocol to the client's PeerManager
client.peerManager.addPeer(serverRemotePeerInfo) # supports Lightpush

Expand All @@ -394,12 +398,15 @@ suite "Waku Lightpush Client":

check publishResponse2.isOk()

when defined(reputation):
if client.reputationManager.isSome:
client.reputationManager.get().setReputation(serverRemotePeerInfo.peerId, some(true))

if client.reputationManager.isSome:
# the reputation of a failed peer is negative
check client.reputationManager.getReputation(
check client.reputationManager.get().getReputation(
serverRemotePeerInfoFailsLightpush.peerId
) == some(false)

# the reputation of a successful peer is positive
check client.reputationManager.getReputation(serverRemotePeerInfo.peerId) ==
check client.reputationManager.get().getReputation(serverRemotePeerInfo.peerId) ==
some(true)
8 changes: 8 additions & 0 deletions waku/factory/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,14 @@ hence would have reachability issues.""",
name: "lightpushnode"
.}: string

## Reputation config
## FIXME: should be set to false by default
reputationEnabled* {.
desc: "Enable client-side reputation for light protocols: true|false",
defaultValue: true,
name: "reputation"
.}: bool

## Reliability config
reliabilityEnabled* {.
desc:
Expand Down
2 changes: 2 additions & 0 deletions waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ proc setupProtocols(
else:
return err("failed to set node waku lightpush peer: " & lightPushNode.error)

## TODO: initialize reputation manager here??

# Filter setup. NOTE Must be mounted after relay
if conf.filter:
try:
Expand Down
4 changes: 2 additions & 2 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1009,10 +1009,10 @@ proc mountLightPush*(

node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec))

proc mountLightPushClient*(node: WakuNode) =
proc mountLightPushClient*(node: WakuNode, reputationEnabled: bool = false) =
info "mounting light push client"

node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng)
node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng, reputationEnabled)

proc lightpushPublish*(
node: WakuNode,
Expand Down
43 changes: 19 additions & 24 deletions waku/waku_lightpush/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,18 @@ logScope:
type WakuLightPushClient* = ref object
peerManager*: PeerManager
rng*: ref rand.HmacDrbgContext
reputationManager*: ReputationManager
reputationManager*: Option[ReputationManager]
publishObservers: seq[PublishObserver]

proc new*(
T: type WakuLightPushClient,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
reputationManager: Option[ReputationManager] = none(ReputationManager),
reputationManager: Option[ReputationManager],
): T =
if reputationManager.isSome:
WakuLightPushClient(
peerManager: peerManager, rng: rng, reputationManager: reputationManager.get()
)
else:
WakuLightPushClient(peerManager: peerManager, rng: rng)
WakuLightPushClient(
peerManager: peerManager, rng: rng, reputationManager: reputationManager
)

proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) =
wl.publishObservers.add(obs)
Expand Down Expand Up @@ -75,8 +72,8 @@ proc sendPushRequest(
else:
return err("unknown failure")

when defined(reputation):
wl.reputationManager.updateReputationFromResponse(peer.peerId, response)
if wl.reputationManager.isSome:
wl.reputationManager.get().updateReputationFromResponse(peer.peerId, response)

return ok()

Expand All @@ -92,8 +89,8 @@ proc publish*(
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
let pushResult = await wl.sendPushRequest(pushRequest, peer)
if pushResult.isErr:
when defined(reputation):
wl.reputationManager.setReputation(peer.peerId, some(false))
if wl.reputationManager.isSome:
wl.reputationManager.get().setReputation(peer.peerId, some(false))
return err(pushResult.error)

for obs in wl.publishObservers:
Expand All @@ -110,29 +107,27 @@ proc publish*(
proc selectPeerForLightPush*(
wl: WakuLightPushClient
): Future[Result[RemotePeerInfo, string]] {.async, gcsafe.} =
## If reputation flag is defined, try to ensure the selected peer is not bad-rep.
## Repeat peer selection until either maxAttempts is exceeded,
## or a good-rep or neutral-rep peer is found.
## Note: this procedure CAN return a bad-rep peer if maxAttempts is exceeded.
let maxAttempts = if defined(reputation): 10 else: 1
var attempts = 0
var peerResult: Result[RemotePeerInfo, string]
while attempts < maxAttempts:
let candidate = wl.peerManager.selectPeer(WakuLightPushCodec, none(PubsubTopic)).valueOr:
return err("could not retrieve a peer supporting WakuLightPushCodec")
if not (wl.reputationManager.getReputation(candidate.peerId) == some(false)):
return ok(candidate)
attempts += 1
if wl.reputationManager.isSome():
let reputation = wl.reputationManager.get().getReputation(candidate.peerId)
info "Peer selected", peerId = candidate.peerId, reputation = $reputation, attempts = $attempts
if (reputation == some(false)):
attempts += 1
continue
return ok(candidate)
warn "Maximum reputation-based retries exceeded; continuing with a bad-reputation peer."
# Return last candidate even if it has bad reputation
return peerResult

proc publishToAny*(
wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[string]] {.async, gcsafe.} =
## Publish a message via a peer that we get from the peer manager

info "publishToAny", msg_hash = computeMessageHash(pubSubTopic, message).to0xHex

let peer = ?await wl.selectPeerForLightPush()
return await wl.publish(pubSubTopic, message, peer)
let publishResult = await wl.publish(pubSubTopic, message, peer)
info "Publish result", result = $publishResult
return publishResult

0 comments on commit e8e36a3

Please sign in to comment.