diff --git a/src/index.ts b/src/index.ts index 27a632f7..57b469db 100644 --- a/src/index.ts +++ b/src/index.ts @@ -935,16 +935,26 @@ export class GossipSub extends EventEmitter implements PubSub this.log(err)) + this.handleReceivedRpc(peerId, rpc).catch((err) => { + this.metrics?.onRpcRecvError() + this.log(err) + }) } } catch (e) { + this.metrics?.onRpcDataError() this.log(e as Error) } } }) } catch (err) { + this.metrics?.onPeerReadStreamError() this.handlePeerReadStreamError(err as Error, peerId) } } @@ -969,7 +979,21 @@ export class GossipSub extends EventEmitter implements PubSub 0) { @@ -1013,7 +1037,10 @@ export class GossipSub extends EventEmitter implements PubSub this.log(err)) + .catch((err) => { + this.metrics?.onMsgRecvError(message.topic) + this.log(err) + }) if (this.opts.awaitRpcMessageHandler) { await handleReceivedMessagePromise diff --git a/src/metrics.ts b/src/metrics.ts index d7fd2824..68b8eea2 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -249,6 +249,12 @@ export function getMetrics( labelNames: ['hit'] }), + // peer stream + peerReadStreamError: register.gauge({ + name: 'gossipsub_peer_read_stream_err_count_total', + help: 'Peer read stream error' + }), + // RPC outgoing. Track byte length + data structure sizes rpcRecvBytes: register.gauge({ name: 'gossipsub_rpc_recv_bytes_total', help: 'RPC recv' }), rpcRecvCount: register.gauge({ name: 'gossipsub_rpc_recv_count_total', help: 'RPC recv' }), @@ -259,6 +265,8 @@ export function getMetrics( rpcRecvIWant: register.gauge({ name: 'gossipsub_rpc_recv_iwant_total', help: 'RPC recv' }), rpcRecvGraft: register.gauge({ name: 'gossipsub_rpc_recv_graft_total', help: 'RPC recv' }), rpcRecvPrune: register.gauge({ name: 'gossipsub_rpc_recv_prune_total', help: 'RPC recv' }), + rpcDataError: register.gauge({ name: 'gossipsub_rpc_data_err_count_total', help: 'RPC data error' }), + rpcRecvError: register.gauge({ name: 'gossipsub_rpc_recv_err_count_total', help: 'RPC recv error' }), /** Total count of RPC dropped because acceptFrom() == false */ rpcRecvNotAccepted: register.gauge({ @@ -323,6 +331,12 @@ export function getMetrics( help: 'Total count of recv msgs before any validation', labelNames: ['topic'] }), + /** Total count of recv msgs error */ + msgReceivedError: register.gauge<{ topic: TopicLabel }>({ + name: 'gossipsub_msg_received_error_total', + help: 'Total count of recv msgs error', + labelNames: ['topic'] + }), /** Tracks distribution of recv msgs by duplicate, invalid, valid */ msgReceivedStatus: register.gauge<{ topic: TopicLabel; status: MessageStatus }>({ name: 'gossipsub_msg_received_status_total', @@ -615,6 +629,11 @@ export function getMetrics( this.msgReceivedPreValidation.inc({ topic }, 1) }, + onMsgRecvError(topicStr: TopicStr): void { + const topic = this.toTopic(topicStr) + this.msgReceivedError.inc({ topic }, 1) + }, + onMsgRecvResult(topicStr: TopicStr, status: MessageStatus): void { const topic = this.toTopic(topicStr) this.msgReceivedStatus.inc({ topic, status }) @@ -640,6 +659,18 @@ export function getMetrics( this.duplicateMsgIgnored.inc({ topic }, 1) }, + onPeerReadStreamError(): void { + this.peerReadStreamError.inc(1) + }, + + onRpcRecvError(): void { + this.rpcRecvError.inc(1) + }, + + onRpcDataError(): void { + this.rpcDataError.inc(1) + }, + onRpcRecv(rpc: IRPC, rpcBytes: number): void { this.rpcRecvBytes.inc(rpcBytes) this.rpcRecvCount.inc(1)