diff --git a/.changeset/fast-icons-admire.md b/.changeset/fast-icons-admire.md new file mode 100644 index 0000000000..0170623aaa --- /dev/null +++ b/.changeset/fast-icons-admire.md @@ -0,0 +1,5 @@ +--- +"@farcaster/hubble": patch +--- + +feat: add missing messages to sync trie via sync health job diff --git a/apps/hubble/src/network/sync/syncHealthJob.ts b/apps/hubble/src/network/sync/syncHealthJob.ts index a61f5c171b..d1d8c80189 100644 --- a/apps/hubble/src/network/sync/syncHealthJob.ts +++ b/apps/hubble/src/network/sync/syncHealthJob.ts @@ -7,6 +7,7 @@ import { peerIdFromString } from "@libp2p/peer-id"; import { bytesToHexString, fromFarcasterTime, Message, UserDataType } from "@farcaster/hub-nodejs"; import { Result } from "neverthrow"; import { SubmitError } from "../../utils/syncHealth.js"; +import { SyncId } from "./syncId.js"; const log = logger.child({ component: "SyncHealth", @@ -75,9 +76,15 @@ export class MeasureSyncHealthJobScheduler { return undefined; } - processSumbitResults(results: Result[], peerId: string, startTime: number, stopTime: number) { + async processSumbitResults( + results: Result[], + peerId: string, + startTime: number, + stopTime: number, + ) { let numSuccesses = 0; let numErrors = 0; + let numAlreadyMerged = 0; for (const result of results) { if (result.isOk()) { const hashString = bytesToHexString(result.value.hash); @@ -105,25 +112,31 @@ export class MeasureSyncHealthJobScheduler { } else { const hashString = bytesToHexString(result.error.originalMessage.hash); const hash = hashString.isOk() ? hashString.value : "unable to show hash"; - log.info( - { - errMessage: result.error.hubError.message, - peerId, - startTime, - stopTime, - msgDetails: { - fid: result.error.originalMessage.data?.fid, - timestamp: this.unixTimestampFromMessage(result.error.originalMessage), - hash, - }, - }, - "Failed to submit message via SyncHealth", - ); - numErrors += 1; + const logTags = { + errMessage: result.error.hubError.message, + peerId, + startTime, + stopTime, + msgDetails: { + fid: result.error.originalMessage.data?.fid, + timestamp: this.unixTimestampFromMessage(result.error.originalMessage), + hash, + }, + }; + if (result.error.hubError.errCode === "bad_request.duplicate") { + // This message has already been merged into the DB, but for some reason is not in the Trie. + // Just update the trie. + await this._metadataRetriever._syncEngine.trie.insert(SyncId.fromMessage(result.error.originalMessage)); + log.info(logTags, "Merged missing message into sync trie via SyncHealth"); + numAlreadyMerged += 1; + } else { + log.info(logTags, "Failed to submit message via SyncHealth"); + numErrors += 1; + } } } - return { numSuccesses, numErrors }; + return { numSuccesses, numErrors, numAlreadyMerged }; } async doJobs() { @@ -178,13 +191,15 @@ export class MeasureSyncHealthJobScheduler { continue; } + const processedResults = await this.processSumbitResults(resultsPushingToUs.value, peerId, startTime, stopTime); + log.info( { ourNumMessages: syncHealthMessageStats.value.primaryNumMessages, theirNumMessages: syncHealthMessageStats.value.peerNumMessages, syncHealth: syncHealthMessageStats.value.computeDiff(), syncHealthPercentage: syncHealthMessageStats.value.computeDiffPercentage(), - resultsPushingToUs: this.processSumbitResults(resultsPushingToUs.value, peerId, startTime, stopTime), + resultsPushingToUs: processedResults, peerId, startTime, stopTime,