Skip to content

Commit

Permalink
feat: unbundle fixed-label metrics (#460)
Browse files Browse the repository at this point in the history
feat: unbundle fixed-label metrics
  • Loading branch information
twoeths authored Aug 10, 2023
1 parent 09296bd commit bb5596d
Showing 1 changed file with 180 additions and 46 deletions.
226 changes: 180 additions & 46 deletions src/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { TopicValidatorResult } from '@libp2p/interface/pubsub'
import { TopicValidatorResult } from '@libp2p/interface/pubsub'
import type { IRPC } from './message/rpc.js'
import type { PeerScoreThresholds } from './score/peer-score-thresholds.js'
import {
Expand Down Expand Up @@ -203,26 +203,66 @@ export function getMetrics(
}),
/** Number of times we include peers in a topic mesh for different reasons.
* = rust-libp2p `mesh_peer_inclusion_events` */
meshPeerInclusionEvents: register.gauge<{ reason: InclusionReason }>({
name: 'gossipsub_mesh_peer_inclusion_events_total',
help: 'Number of times we include peers in a topic mesh for different reasons',
labelNames: ['reason']
meshPeerInclusionEventsFanout: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_mesh_peer_inclusion_events_fanout_total',
help: 'Number of times we include peers in a topic mesh for fanout reasons',
labelNames: ['topic']
}),
meshPeerInclusionEventsRandom: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_mesh_peer_inclusion_events_random_total',
help: 'Number of times we include peers in a topic mesh for random reasons',
labelNames: ['topic']
}),
meshPeerInclusionEventsSubscribed: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_mesh_peer_inclusion_events_subscribed_total',
help: 'Number of times we include peers in a topic mesh for subscribed reasons',
labelNames: ['topic']
}),
meshPeerInclusionEventsOutbound: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_mesh_peer_inclusion_events_outbound_total',
help: 'Number of times we include peers in a topic mesh for outbound reasons',
labelNames: ['topic']
}),
meshPeerInclusionEventsNotEnough: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_mesh_peer_inclusion_events_not_enough_total',
help: 'Number of times we include peers in a topic mesh for not_enough reasons',
labelNames: ['topic']
}),
meshPeerInclusionEventsOpportunistic: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_mesh_peer_inclusion_events_opportunistic_total',
help: 'Number of times we include peers in a topic mesh for opportunistic reasons',
labelNames: ['topic']
}),
meshPeerInclusionEventsByTopic: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_mesh_peer_inclusion_events_by_topic_total',
help: 'Number of times we include peers in a topic',
meshPeerInclusionEventsUnknown: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_mesh_peer_inclusion_events_unknown_total',
help: 'Number of times we include peers in a topic mesh for unknown reasons',
labelNames: ['topic']
}),
/** Number of times we remove peers in a topic mesh for different reasons.
* = rust-libp2p `mesh_peer_churn_events` */
meshPeerChurnEvents: register.gauge<{ reason: ChurnReason }>({
name: 'gossipsub_peer_churn_events_total',
help: 'Number of times we remove peers in a topic mesh for different reasons',
labelNames: ['reason']
meshPeerChurnEventsDisconnected: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_peer_churn_events_disconnected_total',
help: 'Number of times we remove peers in a topic mesh for disconnected reasons',
labelNames: ['topic']
}),
meshPeerChurnEventsByTopic: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_peer_churn_events_by_topic_total',
help: 'Number of times we remove peers in a topic',
meshPeerChurnEventsBadScore: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_peer_churn_events_bad_score_total',
help: 'Number of times we remove peers in a topic mesh for bad_score reasons',
labelNames: ['topic']
}),
meshPeerChurnEventsPrune: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_peer_churn_events_prune_total',
help: 'Number of times we remove peers in a topic mesh for prune reasons',
labelNames: ['topic']
}),
meshPeerChurnEventsExcess: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_peer_churn_events_excess_total',
help: 'Number of times we remove peers in a topic mesh for excess reasons',
labelNames: ['topic']
}),
meshPeerChurnEventsUnknown: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_peer_churn_events_excess_total',
help: 'Number of times we remove peers in a topic mesh for unknown reasons',
labelNames: ['topic']
}),

Expand Down Expand Up @@ -251,14 +291,24 @@ export function getMetrics(
/** Message validation results for each topic.
* Invalid == Reject?
* = rust-libp2p `invalid_messages`, `accepted_messages`, `ignored_messages`, `rejected_messages` */
asyncValidationResult: register.gauge<{ acceptance: TopicValidatorResult }>({
name: 'gossipsub_async_validation_result_total',
help: 'Message validation result',
labelNames: ['acceptance']
acceptedMessagesTotal: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_accepted_messages_total',
help: 'Total accepted messages for each topic',
labelNames: ['topic']
}),
asyncValidationResultByTopic: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_async_validation_result_by_topic_total',
help: 'Message validation result for each topic',
ignoredMessagesTotal: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_ignored_messages_total',
help: 'Total ignored messages for each topic',
labelNames: ['topic']
}),
rejectedMessagesTotal: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_rejected_messages_total',
help: 'Total rejected messages for each topic',
labelNames: ['topic']
}),
unknownValidationResultsTotal: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_unknown_validation_results_total',
help: 'Total unknown validation results for each topic',
labelNames: ['topic']
}),
/** When the user validates a message, it tries to re propagate it to its mesh peers. If the
Expand Down Expand Up @@ -333,11 +383,25 @@ export function getMetrics(
labelNames: ['topic']
}),
/** Total count of peers (by group) that we publish a msg to */
// NOTE: Do not use 'group' label since it's a generic already used by Prometheus to group instances
msgPublishPeersByGroup: register.gauge<{ peerGroup: keyof ToSendGroupCount }>({
name: 'gossipsub_msg_publish_peers_by_group',
help: 'Total count of peers (by group) that we publish a msg to',
labelNames: ['peerGroup']
directPeersPublishedTotal: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_direct_peers_published_total',
help: 'Total direct peers that we publish a msg to',
labelNames: ['topic']
}),
floodsubPeersPublishedTotal: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_floodsub_peers_published_total',
help: 'Total floodsub peers that we publish a msg to',
labelNames: ['topic']
}),
meshPeersPublishedTotal: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_mesh_peers_published_total',
help: 'Total mesh peers that we publish a msg to',
labelNames: ['topic']
}),
fanoutPeersPublishedTotal: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_fanout_peers_published_total',
help: 'Total fanout peers that we publish a msg to',
labelNames: ['topic']
}),
/** Total count of msg publish data.length bytes */
msgPublishBytes: register.gauge<{ topic: TopicLabel }>({
Expand Down Expand Up @@ -379,14 +443,24 @@ export function getMetrics(
labelNames: ['topic']
}),
/** Tracks distribution of recv msgs by duplicate, invalid, valid */
msgReceivedStatus: register.gauge<{ status: MessageStatus }>({
name: 'gossipsub_msg_received_status_total',
help: 'Tracks distribution of recv msgs by duplicate, invalid, valid',
labelNames: ['status']
msgReceivedInvalidTotal: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_msg_received_invalid_total',
help: 'Total count of invalid messages received',
labelNames: ['topic']
}),
msgReceivedValidTotal: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_msg_received_valid_total',
help: 'Total count of valid messages received',
labelNames: ['topic']
}),
msgReceivedDuplicateTotal: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_msg_received_duplicate_total',
help: 'Total count of duplicate messages received',
labelNames: ['topic']
}),
msgReceivedTopic: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_msg_received_topic_total',
help: 'Tracks distribution of recv msgs by topic label',
msgReceivedUnknownStatusTotal: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_msg_received_unknown_status_total',
help: 'Total count of unknown_status messages received',
labelNames: ['topic']
}),
/** Tracks specific reason of invalid */
Expand Down Expand Up @@ -626,8 +700,29 @@ export function getMetrics(
/** Register the inclusion of peers in our mesh due to some reason. */
onAddToMesh(topicStr: TopicStr, reason: InclusionReason, count: number): void {
const topic = this.toTopic(topicStr)
this.meshPeerInclusionEvents.inc({ reason }, count)
this.meshPeerInclusionEventsByTopic.inc({ topic }, count)
switch (reason) {
case InclusionReason.Fanout:
this.meshPeerInclusionEventsFanout.inc({ topic }, count)
break
case InclusionReason.Random:
this.meshPeerInclusionEventsRandom.inc({ topic }, count)
break
case InclusionReason.Subscribed:
this.meshPeerInclusionEventsSubscribed.inc({ topic }, count)
break
case InclusionReason.Outbound:
this.meshPeerInclusionEventsOutbound.inc({ topic }, count)
break
case InclusionReason.NotEnough:
this.meshPeerInclusionEventsNotEnough.inc({ topic }, count)
break
case InclusionReason.Opportunistic:
this.meshPeerInclusionEventsOpportunistic.inc({ topic }, count)
break
default:
this.meshPeerInclusionEventsUnknown.inc({ topic }, count)
break
}
},

/** Register the removal of peers in our mesh due to some reason */
Expand All @@ -637,8 +732,23 @@ export function getMetrics(
// - on_disconnect() Churn::Ds
onRemoveFromMesh(topicStr: TopicStr, reason: ChurnReason, count: number): void {
const topic = this.toTopic(topicStr)
this.meshPeerChurnEvents.inc({ reason }, count)
this.meshPeerChurnEventsByTopic.inc({ topic }, count)
switch (reason) {
case ChurnReason.Dc:
this.meshPeerChurnEventsDisconnected.inc({ topic }, count)
break
case ChurnReason.BadScore:
this.meshPeerChurnEventsBadScore.inc({ topic }, count)
break
case ChurnReason.Prune:
this.meshPeerChurnEventsPrune.inc({ topic }, count)
break
case ChurnReason.Excess:
this.meshPeerChurnEventsExcess.inc({ topic }, count)
break
default:
this.meshPeerChurnEventsUnknown.inc({ topic }, count)
break
}
},

/**
Expand All @@ -654,8 +764,20 @@ export function getMetrics(

if (messageRecord != null) {
const topic = this.toTopic(messageRecord.message.topic)
this.asyncValidationResult.inc({ acceptance })
this.asyncValidationResultByTopic.inc({ topic })
switch (acceptance) {
case TopicValidatorResult.Accept:
this.acceptedMessagesTotal.inc({ topic })
break
case TopicValidatorResult.Ignore:
this.ignoredMessagesTotal.inc({ topic })
break
case TopicValidatorResult.Reject:
this.rejectedMessagesTotal.inc({ topic })
break
default:
this.unknownValidationResultsTotal.inc({ topic })
break
}
}

if (firstSeenTimestampMs != null) {
Expand Down Expand Up @@ -708,10 +830,10 @@ export function getMetrics(
this.msgPublishCount.inc({ topic }, 1)
this.msgPublishBytes.inc({ topic }, tosendCount * dataLen)
this.msgPublishPeersByTopic.inc({ topic }, tosendCount)
this.msgPublishPeersByGroup.inc({ peerGroup: 'direct' }, tosendGroupCount.direct)
this.msgPublishPeersByGroup.inc({ peerGroup: 'floodsub' }, tosendGroupCount.floodsub)
this.msgPublishPeersByGroup.inc({ peerGroup: 'mesh' }, tosendGroupCount.mesh)
this.msgPublishPeersByGroup.inc({ peerGroup: 'fanout' }, tosendGroupCount.fanout)
this.directPeersPublishedTotal.inc({ topic }, tosendGroupCount.direct)
this.floodsubPeersPublishedTotal.inc({ topic }, tosendGroupCount.floodsub)
this.meshPeersPublishedTotal.inc({ topic }, tosendGroupCount.mesh)
this.fanoutPeersPublishedTotal.inc({ topic }, tosendGroupCount.fanout)
this.msgPublishTime.observe({ topic }, ms / 1000)
},

Expand All @@ -727,8 +849,20 @@ export function getMetrics(

onMsgRecvResult(topicStr: TopicStr, status: MessageStatus): void {
const topic = this.toTopic(topicStr)
this.msgReceivedTopic.inc({ topic })
this.msgReceivedStatus.inc({ status })
switch (status) {
case MessageStatus.duplicate:
this.msgReceivedDuplicateTotal.inc({ topic })
break
case MessageStatus.invalid:
this.msgReceivedInvalidTotal.inc({ topic })
break
case MessageStatus.valid:
this.msgReceivedValidTotal.inc({ topic })
break
default:
this.msgReceivedUnknownStatusTotal.inc({ topic })
break
}
},

onMsgRecvInvalid(topicStr: TopicStr, reason: RejectReasonObj): void {
Expand Down

0 comments on commit bb5596d

Please sign in to comment.