Skip to content

Commit

Permalink
feat: Validator deadline for reexecution
Browse files Browse the repository at this point in the history
Adds a reexecution deadline for validators, so they abort trying to
reexecute public txs while block building if too late into the slot.

Fixes #10959
  • Loading branch information
spalladino committed Jan 13, 2025
1 parent 0b3088b commit 3e5a6fc
Show file tree
Hide file tree
Showing 11 changed files with 266 additions and 89 deletions.
1 change: 1 addition & 0 deletions yarn-project/end-to-end/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"@aztec/simulator": "workspace:^",
"@aztec/telemetry-client": "workspace:^",
"@aztec/types": "workspace:^",
"@aztec/validator-client": "workspace:^",
"@aztec/world-state": "workspace:^",
"@iarna/toml": "^2.2.5",
"@jest/globals": "^29.5.0",
Expand Down
178 changes: 121 additions & 57 deletions yarn-project/end-to-end/src/e2e_p2p/reex.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { type SentTx, sleep } from '@aztec/aztec.js';

/* eslint-disable-next-line no-restricted-imports */
import { BlockProposal, SignatureDomainSeparator, getHashedSignaturePayload } from '@aztec/circuit-types';
import { type PublicTxSimulator } from '@aztec/simulator';
import { ReExFailedTxsError, ReExStateMismatchError, ReExTimeoutError } from '@aztec/validator-client/errors';

import { beforeAll, describe, it, jest } from '@jest/globals';
import fs from 'fs';
Expand Down Expand Up @@ -31,6 +33,7 @@ describe('e2e_p2p_reex', () => {
basePort: BOOT_NODE_UDP_PORT,
// To collect metrics - run in aztec-packages `docker compose --profile metrics up` and set COLLECT_METRICS=true
metricsPort: shouldCollectMetrics(),
initialConfig: { enforceTimeTable: true },
});

t.logger.verbose('Setup account');
Expand All @@ -55,30 +58,10 @@ describe('e2e_p2p_reex', () => {
}
});

it('validators should re-execute transactions before attesting', async () => {
// create the bootstrap node for the network
if (!t.bootstrapNodeEnr) {
throw new Error('Bootstrap node ENR is not available');
}

t.ctx.aztecNodeConfig.validatorReexecute = true;

nodes = await createNodes(
t.ctx.aztecNodeConfig,
t.ctx.dateProvider,
t.bootstrapNodeEnr,
NUM_NODES,
BOOT_NODE_UDP_PORT,
DATA_DIR,
// To collect metrics - run in aztec-packages `docker compose --profile metrics up` and set COLLECT_METRICS=true
shouldCollectMetrics(),
);

// Hook into the node and intercept re-execution logic, ensuring that it was infact called
const reExecutionSpies = [];
for (const node of nodes) {
// Make sure the nodes submit faulty proposals, in this case a faulty proposal is one where we remove one of the transactions
// Such that the calculated archive will be different!
describe('validators re-execute transactions before attesting', () => {
// Make sure the nodes submit faulty proposals, in this case a faulty proposal is one where we remove one of the transactions
// Such that the calculated archive will be different!
const interceptBroadcastProposal = (node: AztecNodeService) => {
jest.spyOn((node as any).p2pClient, 'broadcastProposal').mockImplementation(async (...args: unknown[]) => {
// We remove one of the transactions, therefore the block root will be different!
const proposal = args[0] as BlockProposal;
Expand All @@ -97,40 +80,121 @@ describe('e2e_p2p_reex', () => {

return (node as any).p2pClient.p2pService.propagate(newProposal);
});

// Store re-execution spys node -> sequencer Client -> seqeuncer -> validator
const spy = jest.spyOn((node as any).sequencer.sequencer.validatorClient, 'reExecuteTransactions');
reExecutionSpies.push(spy);
}

// wait a bit for peers to discover each other
await sleep(4000);

nodes.forEach(node => {
node.getSequencer()?.updateSequencerConfig({
minTxsPerBlock: NUM_TXS_PER_NODE,
maxTxsPerBlock: NUM_TXS_PER_NODE,
};

// Intercepts the simulator within the tx processor within the processor factory with the given function
// Only the processor for validators is intercepted, the one for the proposer is left untouched
// We abuse the fact that the proposer will always run before the validators
let interceptTxProcessorSimulatorCallCount = 0;
const interceptTxProcessorSimulator = (
node: AztecNodeService,
intercept: (simulator: PublicTxSimulator) => void,
) => {
const processorFactory = (node as any).sequencer.sequencer.publicProcessorFactory;
const originalCreate = processorFactory.create.bind(processorFactory);
jest.spyOn(processorFactory, 'create').mockImplementation((...args: unknown[]) => {
interceptTxProcessorSimulatorCallCount++;
const processor = originalCreate(...args);
if (interceptTxProcessorSimulatorCallCount > 1) {
t.logger.warn('Creating mocked processor factory');
const simulator = (processor as any).publicTxSimulator;
intercept(simulator);
} else {
t.logger.warn('Creating vanilla processor factory');
}
return processor;
});
});
const txs = await submitComplexTxsTo(t.logger, t.spamContract!, NUM_TXS_PER_NODE);

// We ensure that the transactions are NOT mined
try {
await Promise.all(
txs.map(async (tx: SentTx, i: number) => {
t.logger.info(`Waiting for tx ${i}: ${await tx.getTxHash()} to be mined`);
return tx.wait();
}),
);
} catch (e) {
t.logger.info('Failed to mine all txs, as planned');
}
};

// Have the public tx processor take an extra long time to process the tx, so the validator times out
const interceptTxProcessorWithTimeout = (node: AztecNodeService) => {
interceptTxProcessorSimulator(node, simulator => {
const anySimulator: any = simulator;
const originalSimulate = anySimulator.simulate.bind(simulator);
jest.spyOn(anySimulator, 'simulate').mockImplementation(async (...args: unknown[]) => {
t.logger.warn('Public tx simulator sleeping for 40s to simulate timeout');
await sleep(40_000);
return originalSimulate(...args);
});
});
};

// Have the public tx processor throw when processing a tx
const interceptTxProcessorWithFailure = (node: AztecNodeService) => {
interceptTxProcessorSimulator(node, simulator => {
const anySimulator: any = simulator;
jest.spyOn(anySimulator, 'process').mockImplementation(async () => {
t.logger.warn('Public tx simulator failing');
await sleep(1);
throw new Error(`Fake tx failure`);
});
});
};

it.each([
['ReExStateMismatchError', new ReExStateMismatchError().message, interceptBroadcastProposal],
['ReExTimeoutError', new ReExTimeoutError().message, interceptTxProcessorWithTimeout],
['ReExFailedTxsError', new ReExFailedTxsError(1).message, interceptTxProcessorWithFailure],
])(
'rejects proposal with %s',
async (_errType: string, errMsg: string, nodeInterceptor: (node: AztecNodeService) => void) => {
// create the bootstrap node for the network
if (!t.bootstrapNodeEnr) {
throw new Error('Bootstrap node ENR is not available');
}

t.ctx.aztecNodeConfig.validatorReexecute = true;

nodes = await createNodes(
t.ctx.aztecNodeConfig,
t.ctx.dateProvider,
t.bootstrapNodeEnr,
NUM_NODES,
BOOT_NODE_UDP_PORT,
DATA_DIR,
// To collect metrics - run in aztec-packages `docker compose --profile metrics up` and set COLLECT_METRICS=true
shouldCollectMetrics(),
);

// Expect that all of the re-execution attempts failed with an invalid root
for (const spy of reExecutionSpies) {
for (const result of spy.mock.results) {
await expect(result.value).rejects.toThrow('Validator Error: Re-execution state mismatch');
}
}
// Hook into the node and intercept re-execution logic
const reExecutionSpies = [];
for (const node of nodes) {
nodeInterceptor(node);
// Collect re-execution spies node -> sequencer client -> sequencer -> validator
const spy = jest.spyOn((node as any).sequencer.sequencer.validatorClient, 'reExecuteTransactions');
reExecutionSpies.push(spy);
}

// Wait a bit for peers to discover each other
await sleep(4000);

nodes.forEach(node => {
node.getSequencer()?.updateSequencerConfig({
minTxsPerBlock: NUM_TXS_PER_NODE,
maxTxsPerBlock: NUM_TXS_PER_NODE,
});
});
const txs = await submitComplexTxsTo(t.logger, t.spamContract!, NUM_TXS_PER_NODE, { callPublic: true });

// We ensure that the transactions are NOT mined
try {
await Promise.all(
txs.map(async (tx: SentTx, i: number) => {
t.logger.info(`Waiting for tx ${i}: ${await tx.getTxHash()} to be mined`);
return tx.wait();
}),
);
} catch (e) {
t.logger.info('Failed to mine all txs, as planned');
}

// Expect that all of the re-execution attempts failed with an invalid root
for (const spy of reExecutionSpies) {
for (const result of spy.mock.results) {
await expect(result.value).rejects.toThrow(errMsg);
}
}
},
);
});
});
11 changes: 9 additions & 2 deletions yarn-project/end-to-end/src/e2e_p2p/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,20 @@ import { type PXEService, createPXEService, getPXEServiceConfig as getRpcConfig
import { type NodeContext } from '../fixtures/setup_p2p_test.js';

// submits a set of transactions to the provided Private eXecution Environment (PXE)
export const submitComplexTxsTo = async (logger: Logger, spamContract: SpamContract, numTxs: number) => {
export const submitComplexTxsTo = async (
logger: Logger,
spamContract: SpamContract,
numTxs: number,
opts: { callPublic?: boolean } = {},
) => {
const txs: SentTx[] = [];

const seed = 1234n;
const spamCount = 15;
for (let i = 0; i < numTxs; i++) {
const tx = spamContract.methods.spam(seed + BigInt(i * spamCount), spamCount, false).send();
const tx = spamContract.methods
.spam(seed + BigInt(i * spamCount), spamCount, !!opts.callPublic)
.send({ skipPublicSimulation: true });
const txHash = await tx.getTxHash();

logger.info(`Tx sent with hash ${txHash}`);
Expand Down
44 changes: 30 additions & 14 deletions yarn-project/end-to-end/src/fixtures/setup_p2p_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
* Test fixtures and utilities to set up and run a test using multiple validators
*/
import { type AztecNodeConfig, AztecNodeService } from '@aztec/aztec-node';
import { type SentTx, createLogger } from '@aztec/aztec.js';
import { type SentTx } from '@aztec/aztec.js';
import { type AztecAddress } from '@aztec/circuits.js';
import { addLogNameHandler, removeLogNameHandler } from '@aztec/foundation/log';
import { type DateProvider } from '@aztec/foundation/timer';
import { type PXEService } from '@aztec/pxe';

import getPort from 'get-port';
import { AsyncLocalStorage } from 'node:async_hooks';

import { TEST_PEER_CHECK_INTERVAL_MS } from './fixtures.js';
import { getPrivateKeyFromIndex } from './utils.js';
Expand All @@ -34,7 +36,7 @@ export function generatePrivateKeys(startIndex: number, numberOfKeys: number): `
return privateKeys;
}

export function createNodes(
export async function createNodes(
config: AztecNodeConfig,
dateProvider: DateProvider,
bootstrapNodeEnr: string,
Expand All @@ -43,16 +45,32 @@ export function createNodes(
dataDirectory?: string,
metricsPort?: number,
): Promise<AztecNodeService[]> {
const nodePromises = [];
const nodePromises: Promise<AztecNodeService>[] = [];
const loggerIdStorage = new AsyncLocalStorage<string>();
const logNameHandler = (module: string) =>
loggerIdStorage.getStore() ? `${module}:${loggerIdStorage.getStore()}` : module;
addLogNameHandler(logNameHandler);

for (let i = 0; i < numNodes; i++) {
// We run on ports from the bootnode upwards
const port = bootNodePort + i + 1;

const dataDir = dataDirectory ? `${dataDirectory}-${i}` : undefined;
const nodePromise = createNode(config, dateProvider, port, bootstrapNodeEnr, i, dataDir, metricsPort);
const nodePromise = createNode(
config,
dateProvider,
port,
bootstrapNodeEnr,
i,
dataDir,
metricsPort,
loggerIdStorage,
);
nodePromises.push(nodePromise);
}
return Promise.all(nodePromises);
const nodes = await Promise.all(nodePromises);
removeLogNameHandler(logNameHandler);
return nodes;
}

// creates a P2P enabled instance of Aztec Node Service
Expand All @@ -64,16 +82,14 @@ export async function createNode(
accountIndex: number,
dataDirectory?: string,
metricsPort?: number,
loggerIdStorage?: AsyncLocalStorage<string>,
) {
const validatorConfig = await createValidatorConfig(config, bootstrapNode, tcpPort, accountIndex, dataDirectory);

const telemetryClient = await getEndToEndTestTelemetryClient(metricsPort);

return await AztecNodeService.createAndSync(validatorConfig, {
telemetry: telemetryClient,
logger: createLogger(`node:${tcpPort}`),
dateProvider,
});
const createNode = async () => {
const validatorConfig = await createValidatorConfig(config, bootstrapNode, tcpPort, accountIndex, dataDirectory);
const telemetry = await getEndToEndTestTelemetryClient(metricsPort);
return await AztecNodeService.createAndSync(validatorConfig, { telemetry, dateProvider });
};
return loggerIdStorage ? await loggerIdStorage.run(tcpPort.toString(), createNode) : createNode();
}

export async function createValidatorConfig(
Expand Down
3 changes: 3 additions & 0 deletions yarn-project/end-to-end/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@
{
"path": "../types"
},
{
"path": "../validator-client"
},
{
"path": "../world-state"
}
Expand Down
18 changes: 17 additions & 1 deletion yarn-project/foundation/src/log/pino-logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { type LogLevel } from './log-levels.js';
import { type LogData, type LogFn } from './log_fn.js';

export function createLogger(module: string): Logger {
module = module.replace(/^aztec:/, '');
module = logNameHandlers.reduce((moduleName, handler) => handler(moduleName), module.replace(/^aztec:/, ''));
const pinoLogger = logger.child({ module }, { level: getLogLevelFromFilters(logFilters, module) });

// We check manually for isLevelEnabled to avoid calling processLogData unnecessarily.
Expand Down Expand Up @@ -56,6 +56,22 @@ function processLogData(data: LogData): LogData {
return logDataHandlers.reduce((accum, handler) => handler(accum), data);
}

// Allow global hooks for tweaking module names.
// Used in tests to add a uid to modules, so we can differentiate multiple nodes in the same process.
type LogNameHandler = (module: string) => string;
const logNameHandlers: LogNameHandler[] = [];

export function addLogNameHandler(handler: LogNameHandler): void {
logNameHandlers.push(handler);
}

export function removeLogNameHandler(handler: LogNameHandler) {
const index = logNameHandlers.indexOf(handler);
if (index !== -1) {
logNameHandlers.splice(index, 1);
}
}

// Patch isLevelEnabled missing from pino/browser.
function isLevelEnabled(logger: pino.Logger<'verbose', boolean>, level: LogLevel): boolean {
return typeof logger.isLevelEnabled === 'function'
Expand Down
Loading

0 comments on commit 3e5a6fc

Please sign in to comment.