Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-13858 - Record inbound message rate in the loopback case #3962

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import net.corda.p2p.linkmanager.membership.NetworkMessagingValidator
import net.corda.p2p.linkmanager.sessions.SessionManager
import net.corda.data.p2p.markers.AppMessageMarker
import net.corda.data.p2p.markers.LinkManagerReceivedMarker
import net.corda.metrics.CordaMetrics
import net.corda.p2p.linkmanager.metrics.recordInboundMessagesMetric
import net.corda.schema.Schemas
import net.corda.utilities.debug
import net.corda.utilities.time.Clock
Expand Down Expand Up @@ -288,35 +288,6 @@ internal class InboundMessageProcessor(
)
}

private fun recordInboundMessagesMetric(message: AuthenticatedMessage) {
message.header.let {
recordInboundMessagesMetric(it.source.x500Name, it.destination.x500Name, it.source.groupId,
it.subsystem, message::class.java.simpleName)
}
}

private fun recordInboundMessagesMetric(message: InboundUnauthenticatedMessage) {
recordInboundMessagesMetric(null, null, null,
message.header.subsystem, message::class.java.simpleName)
}

private fun recordInboundMessagesMetric(source: String?, dest: String?, group: String?, subsystem: String, messageType: String) {
val builder = CordaMetrics.Metric.InboundMessageCount.builder()
listOf(
CordaMetrics.Tag.SourceVirtualNode to source,
CordaMetrics.Tag.DestinationVirtualNode to dest,
CordaMetrics.Tag.MembershipGroup to group,
CordaMetrics.Tag.MessagingSubsystem to subsystem,
CordaMetrics.Tag.MessageType to messageType,
).forEach {
val value = it.second
if (value != null) {
builder.withTag(it.first, value)
}
}
builder.build().increment()
}

private fun <T> checkAllowedCommunication(
counterparties: SessionManager.Counterparties,
func: () -> T
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package net.corda.p2p.linkmanager.metrics

import net.corda.data.p2p.app.AuthenticatedMessage
import net.corda.data.p2p.app.InboundUnauthenticatedMessage
import net.corda.metrics.CordaMetrics
import net.corda.metrics.CordaMetrics.NOT_APPLICABLE_TAG_VALUE

fun recordInboundMessagesMetric(message: AuthenticatedMessage) {
message.header.let {
recordInboundMessagesMetric(it.source.x500Name, it.destination.x500Name, it.source.groupId,
it.subsystem, message::class.java.simpleName)
}
}

fun recordInboundMessagesMetric(message: InboundUnauthenticatedMessage) {
recordInboundMessagesMetric(null, null, null,
message.header.subsystem, message::class.java.simpleName)
}

private fun recordInboundMessagesMetric(source: String?, dest: String?, group: String?, subsystem: String, messageType: String) {
val builder = CordaMetrics.Metric.InboundMessageCount.builder()
listOf(
CordaMetrics.Tag.SourceVirtualNode to source,
CordaMetrics.Tag.DestinationVirtualNode to dest,
CordaMetrics.Tag.MembershipGroup to group,
CordaMetrics.Tag.MessagingSubsystem to subsystem,
CordaMetrics.Tag.MessageType to messageType,
).forEach {
val value = it.second ?: NOT_APPLICABLE_TAG_VALUE
builder.withTag(it.first, value)
}
builder.build().increment()
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import net.corda.p2p.linkmanager.hosting.LinkManagerHostingMap
import net.corda.p2p.linkmanager.inbound.InboundAssignmentListener
import net.corda.p2p.linkmanager.membership.NetworkMessagingValidator
import net.corda.p2p.linkmanager.membership.lookup
import net.corda.p2p.linkmanager.metrics.recordInboundMessagesMetric
import net.corda.p2p.linkmanager.sessions.PendingSessionMessageQueues
import net.corda.p2p.linkmanager.sessions.SessionManager
import net.corda.schema.Schemas
Expand Down Expand Up @@ -199,6 +200,7 @@ internal class OutboundMessageProcessor(
message.payload,
)
if (linkManagerHostingMap.isHostedLocally(message.header.destination.toCorda())) {
recordInboundMessagesMetric(inboundMessage)
return listOf(Record(Schemas.P2P.P2P_IN_TOPIC, LinkManager.generateKey(), AppMessage(inboundMessage)))
} else if (destMemberInfo != null) {
val source = message.header.source.toCorda()
Expand Down Expand Up @@ -264,6 +266,7 @@ internal class OutboundMessageProcessor(
val source = messageAndKey.message.header.source.toCorda()
val destination = messageAndKey.message.header.destination.toCorda()
if (linkManagerHostingMap.isHostedLocally(destination)) {
recordInboundMessagesMetric(messageAndKey.message)
return if (isReplay) {
listOf(Record(Schemas.P2P.P2P_IN_TOPIC, messageAndKey.key, AppMessage(messageAndKey.message)),
recordForLMReceivedMarker(messageAndKey.message.header.messageId)
Expand Down