Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(archiver): do not attempt to decode blob before filtering #11668

Merged
merged 17 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ci3/tmux_split
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ session_name=$1
tmux kill-session -t "$session_name" 2>/dev/null || true

# Start a new tmux session with log level set
# Passing through env vars from run_native_testnet.sh otherwise they end up unset
tmux new-session -d -s "$session_name" -e LOG_LEVEL=${LOG_LEVEL:-"debug"} \
-e OTEL_EXPORTER_OTLP_LOGS_ENDPOINT=${OTEL_EXPORTER_OTLP_LOGS_ENDPOINT:-} \
-e OTEL_EXPORTER_OTLP_METRICS_ENDPOINT=${OTEL_EXPORTER_OTLP_METRICS_ENDPOINT:-} \
-e OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=${OTEL_EXPORTER_OTLP_TRACES_ENDPOINT:-} \
-e L1_CONSENSUS_HOST_URL=${L1_CONSENSUS_HOST_URL:-} \
-e ETHEREUM_HOST=${ETHEREUM_HOST:-} \
-e LOG_JSON=${LOG_JSON:-}

shift 1
Expand Down
2 changes: 1 addition & 1 deletion spartan/aztec-network/eth-devnet/run-locally.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
REPO_ROOT=$(git rev-parse --show-toplevel)

${REPO_ROOT}/spartan/aztec-network/eth-devnet/create.sh
(cd ${REPO_ROOT}/spartan/aztec-network/eth-devnet && docker compose build &&docker compose up)
(cd ${REPO_ROOT}/spartan/aztec-network/eth-devnet && docker compose build && docker compose up)
20 changes: 17 additions & 3 deletions yarn-project/archiver/src/archiver/data_retrieval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { type BlobSinkClientInterface } from '@aztec/blob-sink/client';
import { Body, InboxLeaf, L2Block } from '@aztec/circuit-types';
import { AppendOnlyTreeSnapshot, BlockHeader, Fr, Proof } from '@aztec/circuits.js';
import { asyncPool } from '@aztec/foundation/async-pool';
import { Blob } from '@aztec/foundation/blob';
import { Blob, BlobDeserializationError } from '@aztec/foundation/blob';
import { type EthAddress } from '@aztec/foundation/eth-address';
import { type ViemSignature } from '@aztec/foundation/eth-signature';
import { type Logger, createLogger } from '@aztec/foundation/log';
Expand Down Expand Up @@ -109,6 +109,7 @@ export async function processL2BlockProposedLogs(
blobHashes,
l2BlockNumber,
rollup.address,
logger,
);

const l1: L1PublishedData = {
Expand Down Expand Up @@ -201,6 +202,7 @@ async function getBlockFromRollupTx(
blobHashes: Buffer[], // WORKTODO(md): buffer32?
l2BlockNum: bigint,
rollupAddress: Hex,
logger: Logger,
): Promise<L2Block> {
const { input: forwarderData, blockHash } = await publicClient.getTransaction({ hash: txHash });

Expand Down Expand Up @@ -232,13 +234,25 @@ async function getBlockFromRollupTx(
];

const header = BlockHeader.fromBuffer(Buffer.from(hexToBytes(decodedArgs.header)));

const blobBodies = await blobSinkClient.getBlobSidecar(blockHash, blobHashes);
if (blobBodies.length === 0) {
throw new NoBlobBodiesFoundError(Number(l2BlockNum));
}

const blockFields = blobBodies.flatMap(b => b.toEncodedFields());
// TODO(#9101): Once calldata is removed, we can remove this field encoding and update
// Body.fromBlobFields to accept blob buffers directly
let blockFields: Fr[];
try {
blockFields = blobBodies.flatMap(b => b.toEncodedFields());
} catch (err: any) {
if (err instanceof BlobDeserializationError) {
logger.fatal(err.message);
} else {
logger.fatal('Unable to sync: failed to decode fetched blob, this blob was likely not created by us');
}
throw err;
}

// TODO(#9101): Retreiving the block body from calldata is a temporary soln before we have
// either a beacon chain client or link to some blob store. Web2 is ok because we will
// verify the block body vs the blob as below.
Expand Down
62 changes: 42 additions & 20 deletions yarn-project/blob-sink/src/client/http.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Blob, makeEncodedBlob } from '@aztec/foundation/blob';
import { Blob, makeEncodedBlob, makeUnencodedBlob } from '@aztec/foundation/blob';
import { Fr } from '@aztec/foundation/fields';

import { jest } from '@jest/globals';
Expand Down Expand Up @@ -43,8 +43,11 @@ describe('HttpBlobSinkClient', () => {
describe('Mock Ethereum Clients', () => {
let blobSinkServer: BlobSinkServer;

let testBlob: Blob;
let testBlobHash: Buffer;
let testEncodedBlob: Blob;
let testEncodedBlobHash: Buffer;

let testNonEncodedBlob: Blob;
let testNonEncodedBlobHash: Buffer;

// A blob to be ignored when requesting blobs
// - we do not include it's blobHash in our queries
Expand All @@ -59,10 +62,13 @@ describe('HttpBlobSinkClient', () => {
const MOCK_SLOT_NUMBER = 1;

beforeEach(async () => {
testBlob = await makeEncodedBlob(3);
testBlobHash = testBlob.getEthVersionedBlobHash();
testEncodedBlob = await makeEncodedBlob(3);
testEncodedBlobHash = testEncodedBlob.getEthVersionedBlobHash();

testBlobIgnore = await makeEncodedBlob(3);

testNonEncodedBlob = await makeUnencodedBlob(3);
testNonEncodedBlobHash = testNonEncodedBlob.getEthVersionedBlobHash();
});

const startExecutionHostServer = (): Promise<void> => {
Expand All @@ -89,14 +95,16 @@ describe('HttpBlobSinkClient', () => {
res.end(
JSON.stringify({
data: [
// Correctly encoded blob
{
index: 0,
blob: `0x${Buffer.from(testBlob.data).toString('hex')}`,
blob: `0x${Buffer.from(testEncodedBlob.data).toString('hex')}`,
// eslint-disable-next-line camelcase
kzg_commitment: `0x${testBlob.commitment.toString('hex')}`,
kzg_commitment: `0x${testEncodedBlob.commitment.toString('hex')}`,
// eslint-disable-next-line camelcase
kzg_proof: `0x${testBlob.proof.toString('hex')}`,
kzg_proof: `0x${testEncodedBlob.proof.toString('hex')}`,
},
// Correctly encoded blob, but we do not ask for it in the client
{
index: 1,
blob: `0x${Buffer.from(testBlobIgnore.data).toString('hex')}`,
Expand All @@ -105,6 +113,15 @@ describe('HttpBlobSinkClient', () => {
// eslint-disable-next-line camelcase
kzg_proof: `0x${testBlobIgnore.proof.toString('hex')}`,
},
// Incorrectly encoded blob
{
index: 2,
blob: `0x${Buffer.from(testNonEncodedBlob.data).toString('hex')}`,
// eslint-disable-next-line camelcase
kzg_commitment: `0x${testNonEncodedBlob.commitment.toString('hex')}`,
// eslint-disable-next-line camelcase
kzg_proof: `0x${testNonEncodedBlob.proof.toString('hex')}`,
},
],
}),
);
Expand Down Expand Up @@ -147,11 +164,11 @@ describe('HttpBlobSinkClient', () => {
l1RpcUrl: `http://localhost:${executionHostPort}`,
});

const success = await client.sendBlobsToBlobSink('0x1234', [testBlob]);
const success = await client.sendBlobsToBlobSink('0x1234', [testEncodedBlob]);
expect(success).toBe(true);

const retrievedBlobs = await client.getBlobSidecar('0x1234', [testBlobHash]);
expect(retrievedBlobs).toEqual([testBlob]);
const retrievedBlobs = await client.getBlobSidecar('0x1234', [testEncodedBlobHash]);
expect(retrievedBlobs).toEqual([testEncodedBlob]);

// Check that the blob sink was called with the correct block hash and no index
expect(blobSinkSpy).toHaveBeenCalledWith('0x1234', undefined);
Expand All @@ -160,25 +177,30 @@ describe('HttpBlobSinkClient', () => {
// When the consensus host is responding, we should request blobs from the consensus host
// based on the slot number
it('should request based on slot where consensus host is provided', async () => {
blobSinkServer = new BlobSinkServer({
port: 0,
await startExecutionHostServer();
await startConsensusHostServer();

const client = new HttpBlobSinkClient({
l1RpcUrl: `http://localhost:${executionHostPort}`,
l1ConsensusHostUrl: `http://localhost:${consensusHostPort}`,
});
await blobSinkServer.start();

const retrievedBlobs = await client.getBlobSidecar('0x1234', [testEncodedBlobHash]);
expect(retrievedBlobs).toEqual([testEncodedBlob]);
});

it('Even if we ask for non-encoded blobs, we should only get encoded blobs', async () => {
await startExecutionHostServer();
await startConsensusHostServer();

const client = new HttpBlobSinkClient({
blobSinkUrl: `http://localhost:${blobSinkServer.port}`,
l1RpcUrl: `http://localhost:${executionHostPort}`,
l1ConsensusHostUrl: `http://localhost:${consensusHostPort}`,
});

const success = await client.sendBlobsToBlobSink('0x1234', [testBlob]);
expect(success).toBe(true);

const retrievedBlobs = await client.getBlobSidecar('0x1234', [testBlobHash]);
expect(retrievedBlobs).toEqual([testBlob]);
const retrievedBlobs = await client.getBlobSidecar('0x1234', [testEncodedBlobHash, testNonEncodedBlobHash]);
// We should only get the correctly encoded blob
expect(retrievedBlobs).toEqual([testEncodedBlob]);
});
});
});
77 changes: 51 additions & 26 deletions yarn-project/blob-sink/src/client/http.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Blob, type BlobJson } from '@aztec/foundation/blob';
import { Blob, BlobDeserializationError, type BlobJson } from '@aztec/foundation/blob';
import { type Logger, createLogger } from '@aztec/foundation/log';
import { makeBackoff, retry } from '@aztec/foundation/retry';

Expand All @@ -15,13 +15,20 @@ export class HttpBlobSinkClient implements BlobSinkClientInterface {
this.config = config ?? getBlobSinkConfigFromEnv();
this.log = createLogger('aztec:blob-sink-client');
this.fetch = async (...args: Parameters<typeof fetch>): Promise<Response> => {
return await retry(() => fetch(...args), `Fetching ${args[0]}`, makeBackoff([1, 1, 3]), this.log);
return await retry(
() => fetch(...args),
`Fetching ${args[0]}`,
makeBackoff([1, 1, 3]),
this.log,
/*failSilently=*/ true,
);
};
}

public async sendBlobsToBlobSink(blockHash: string, blobs: Blob[]): Promise<boolean> {
// TODO(md): for now we are assuming the indexes of the blobs will be 0, 1, 2
// When in reality they will not, but for testing purposes this is fine
// Right now we fetch everything, then filter out the blobs that we don't want
if (!this.config.blobSinkUrl) {
this.log.verbose('No blob sink url configured');
return false;
Expand Down Expand Up @@ -49,7 +56,10 @@ export class HttpBlobSinkClient implements BlobSinkClientInterface {
this.log.error('Failed to send blobs to blob sink', res.status);
return false;
} catch (err) {
this.log.error(`Error sending blobs to blob sink`, err);
this.log.warn(`Blob sink url configured, but unable to send blobs`, {
blobSinkUrl: this.config.blobSinkUrl,
blockHash,
});
return false;
}
}
Expand All @@ -72,10 +82,14 @@ export class HttpBlobSinkClient implements BlobSinkClientInterface {
*/
public async getBlobSidecar(blockHash: string, blobHashes: Buffer[], indices?: number[]): Promise<Blob[]> {
let blobs: Blob[] = [];

if (this.config.blobSinkUrl) {
this.log.debug('Getting blob sidecar from blob sink');
blobs = await this.getBlobSidecarFrom(this.config.blobSinkUrl, blockHash, indices);
blobs = await this.getBlobSidecarFrom(this.config.blobSinkUrl, blockHash, blobHashes, indices);
this.log.debug(`Got ${blobs.length} blobs from blob sink`);
if (blobs.length > 0) {
return blobs;
}
}

if (blobs.length == 0 && this.config.l1ConsensusHostUrl) {
Expand All @@ -86,28 +100,24 @@ export class HttpBlobSinkClient implements BlobSinkClientInterface {
});
const slotNumber = await this.getSlotNumber(blockHash);
if (slotNumber) {
const blobs = await this.getBlobSidecarFrom(this.config.l1ConsensusHostUrl, slotNumber, indices);
const blobs = await this.getBlobSidecarFrom(this.config.l1ConsensusHostUrl, slotNumber, blobHashes, indices);
this.log.debug(`Got ${blobs.length} blobs from consensus host`);
if (blobs.length > 0) {
return blobs;
}
}
}

if (blobs.length > 0) {
return filterRelevantBlobs(blobs, blobHashes);
}

this.log.verbose('No blob sources available');
return [];
}

public async getBlobSidecarFrom(
hostUrl: string,
blockHashOrSlot: string | number,
blobHashes: Buffer[],
indices?: number[],
): Promise<Blob[]> {
// TODO(md): right now we assume all blobs are ours, this will not yet work on sepolia
try {
let baseUrl = `${hostUrl}/eth/v1/beacon/blob_sidecars/${blockHashOrSlot}`;
if (indices && indices.length > 0) {
Expand All @@ -122,14 +132,42 @@ export class HttpBlobSinkClient implements BlobSinkClientInterface {

if (res.ok) {
const body = await res.json();
const blobs = await Promise.all(body.data.map((b: BlobJson) => Blob.fromJson(b)));
return blobs;
const preFilteredBlobsPromise = body.data
// Filter out blobs that did not come from our rollup
.filter((b: BlobJson) => {
const committment = Buffer.from(b.kzg_commitment.slice(2), 'hex');
const blobHash = Blob.getEthVersionedBlobHash(committment);
return blobHashes.some(hash => hash.equals(blobHash));
})
// Attempt to deserialise the blob
// If we cannot decode it, then it is malicious and we should not use it
.map(async (b: BlobJson): Promise<Blob | undefined> => {
try {
return await Blob.fromJson(b);
} catch (err) {
if (err instanceof BlobDeserializationError) {
this.log.warn(`Failed to deserialise blob`, { commitment: b.kzg_commitment });
return undefined;
}
throw err;
}
});

// Second map is async, so we need to await it
const preFilteredBlobs = await Promise.all(preFilteredBlobsPromise);

// Filter out blobs that did not deserialise
const filteredBlobs = preFilteredBlobs.filter((b: Blob | undefined) => {
return b !== undefined;
});

return filteredBlobs;
}

this.log.debug(`Unable to get blob sidecar`, res.status);
return [];
} catch (err: any) {
this.log.error(`Unable to get blob sidecar`, err.message);
this.log.warn(`Unable to get blob sidecar from ${hostUrl}`, err.message);
return [];
}
}
Expand Down Expand Up @@ -210,19 +248,6 @@ export class HttpBlobSinkClient implements BlobSinkClientInterface {
}
}

/**
* Filter blobs based on a list of blob hashes
* @param blobs
* @param blobHashes
* @returns
*/
function filterRelevantBlobs(blobs: Blob[], blobHashes: Buffer[]): Blob[] {
return blobs.filter(blob => {
const blobHash = blob.getEthVersionedBlobHash();
return blobHashes.some(hash => hash.equals(blobHash));
});
}

function getBeaconNodeFetchOptions(url: string, config: BlobSinkConfig) {
let formattedUrl = url;
if (config.l1ConsensusHostApiKey && !config.l1ConsensusHostApiKeyHeader) {
Expand Down
Loading