Skip to content

Commit 810f4a5

Browse files
nflaigphilknows
authored andcommitted
feat: add option to disable thread pool for keystore decryption (#6949)
1 parent f20484b commit 810f4a5

File tree

4 files changed

+77
-25
lines changed

4 files changed

+77
-25
lines changed

packages/cli/src/cmds/validator/keymanager/decryptKeystoreDefinitions.ts

+52-17
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
1+
import fs from "node:fs";
12
import path from "node:path";
23
import bls from "@chainsafe/bls";
4+
import {Keystore} from "@chainsafe/bls-keystore";
35
import {SignerLocal, SignerType} from "@lodestar/validator";
46
import {LogLevel, Logger} from "@lodestar/utils";
57
import {lockFilepath, unlockFilepath} from "../../../util/lockfile.js";
68
import {LocalKeystoreDefinition} from "./interface.js";
79
import {clearKeystoreCache, loadKeystoreCache, writeKeystoreCache} from "./keystoreCache.js";
810
import {DecryptKeystoresThreadPool} from "./decryptKeystores/index.js";
911

10-
type KeystoreDecryptOptions = {
12+
export type KeystoreDecryptOptions = {
1113
ignoreLockFile?: boolean;
1214
onDecrypt?: (index: number) => void;
1315
// Try to use the cache file if it exists
1416
cacheFilePath?: string;
17+
/** Use main thread to decrypt keystores */
18+
disableThreadPool?: boolean;
1519
logger: Pick<Logger, LogLevel.info | LogLevel.warn | LogLevel.debug>;
1620
signal: AbortSignal;
1721
};
@@ -57,14 +61,50 @@ export async function decryptKeystoreDefinitions(
5761
const signers = new Array<SignerLocal>(keystoreCount);
5862
const passwords = new Array<string>(keystoreCount);
5963
const errors: KeystoreDecryptError[] = [];
60-
const decryptKeystores = new DecryptKeystoresThreadPool(keystoreCount, opts.signal);
6164

62-
for (const [index, definition] of keystoreDefinitions.entries()) {
63-
lockKeystore(definition.keystorePath, opts);
65+
if (!opts.disableThreadPool) {
66+
const decryptKeystores = new DecryptKeystoresThreadPool(keystoreCount, opts.signal);
67+
68+
for (const [index, definition] of keystoreDefinitions.entries()) {
69+
lockKeystore(definition.keystorePath, opts);
70+
71+
decryptKeystores.queue(
72+
definition,
73+
(secretKeyBytes: Uint8Array) => {
74+
const signer: SignerLocal = {
75+
type: SignerType.Local,
76+
secretKey: bls.SecretKey.fromBytes(secretKeyBytes),
77+
};
78+
79+
signers[index] = signer;
80+
passwords[index] = definition.password;
81+
82+
if (opts?.onDecrypt) {
83+
opts?.onDecrypt(index);
84+
}
85+
},
86+
(error: Error) => {
87+
// In-progress tasks can't be canceled, so there's a chance that multiple errors may be caught
88+
// add to the list of errors
89+
errors.push({keystoreFile: path.basename(definition.keystorePath), error});
90+
// cancel all pending tasks, no need to continue decrypting after we hit one error
91+
decryptKeystores.cancel();
92+
}
93+
);
94+
}
95+
96+
await decryptKeystores.completed();
97+
} else {
98+
// Decrypt keystores in main thread
99+
for (const [index, definition] of keystoreDefinitions.entries()) {
100+
lockKeystore(definition.keystorePath, opts);
101+
102+
try {
103+
const keystore = Keystore.parse(fs.readFileSync(definition.keystorePath, "utf8"));
104+
105+
// Memory-hogging function
106+
const secretKeyBytes = await keystore.decrypt(definition.password);
64107

65-
decryptKeystores.queue(
66-
definition,
67-
(secretKeyBytes: Uint8Array) => {
68108
const signer: SignerLocal = {
69109
type: SignerType.Local,
70110
secretKey: bls.SecretKey.fromBytes(secretKeyBytes),
@@ -76,19 +116,14 @@ export async function decryptKeystoreDefinitions(
76116
if (opts?.onDecrypt) {
77117
opts?.onDecrypt(index);
78118
}
79-
},
80-
(error: Error) => {
81-
// In-progress tasks can't be canceled, so there's a chance that multiple errors may be caught
82-
// add to the list of errors
83-
errors.push({keystoreFile: path.basename(definition.keystorePath), error});
84-
// cancel all pending tasks, no need to continue decrypting after we hit one error
85-
decryptKeystores.cancel();
119+
} catch (e) {
120+
errors.push({keystoreFile: path.basename(definition.keystorePath), error: e as Error});
121+
// stop processing, no need to continue decrypting after we hit one error
122+
break;
86123
}
87-
);
124+
}
88125
}
89126

90-
await decryptKeystores.completed();
91-
92127
if (errors.length > 0) {
93128
// If an error occurs, the program isn't going to be running,
94129
// so we should unlock all lockfiles we created

packages/cli/src/cmds/validator/options.ts

+8
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ export type IValidatorCliArgs = AccountValidatorArgs &
5555

5656
importKeystores?: string[];
5757
importKeystoresPassword?: string;
58+
disableKeystoresThreadPool?: boolean;
5859

5960
"http.requestWireFormat"?: string;
6061
"http.responseWireFormat"?: string;
@@ -301,6 +302,13 @@ export const validatorOptions: CliCommandOptions<IValidatorCliArgs> = {
301302
type: "string",
302303
},
303304

305+
disableKeystoresThreadPool: {
306+
hidden: true,
307+
description:
308+
"Disable thread pool and instead use main thread to decrypt keystores. This can speed up decryption in testing environments like Kurtosis",
309+
type: "boolean",
310+
},
311+
304312
doppelgangerProtection: {
305313
alias: ["doppelgangerProtectionEnabled"],
306314
description: "Enables Doppelganger protection",

packages/cli/src/cmds/validator/signers/index.ts

+2
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ export async function getSignersFromArgs(
9999
ignoreLockFile: args.force,
100100
onDecrypt: needle,
101101
cacheFilePath: path.join(accountPaths.cacheDir, "imported_keystores.cache"),
102+
disableThreadPool: args["disableKeystoresThreadPool"],
102103
logger,
103104
signal,
104105
});
@@ -133,6 +134,7 @@ export async function getSignersFromArgs(
133134
ignoreLockFile: args.force,
134135
onDecrypt: needle,
135136
cacheFilePath: path.join(accountPaths.cacheDir, "local_keystores.cache"),
137+
disableThreadPool: args["disableKeystoresThreadPool"],
136138
logger,
137139
signal,
138140
});

packages/cli/test/unit/validator/decryptKeystoreDefinitions.test.ts

+15-8
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ import {rimraf} from "rimraf";
55
import {getKeystoresStr} from "@lodestar/test-utils";
66
import {cachedSeckeysHex} from "../../utils/cachedKeys.js";
77
import {testFilesDir} from "../../utils.js";
8-
import {decryptKeystoreDefinitions} from "../../../src/cmds/validator/keymanager/decryptKeystoreDefinitions.js";
8+
import {
9+
decryptKeystoreDefinitions,
10+
KeystoreDecryptOptions,
11+
} from "../../../src/cmds/validator/keymanager/decryptKeystoreDefinitions.js";
912
import {LocalKeystoreDefinition} from "../../../src/cmds/validator/keymanager/interface.js";
1013
import {LockfileError, unlockFilepath} from "../../../src/util/lockfile.js";
1114

@@ -56,16 +59,20 @@ describe("decryptKeystoreDefinitions", () => {
5659
}
5760
});
5861

59-
testDecryptKeystoreDefinitions(cacheFilePath);
62+
testDecryptKeystoreDefinitions({cacheFilePath});
6063
});
6164

6265
describe("without keystore cache", () => {
6366
testDecryptKeystoreDefinitions();
6467
});
6568

66-
function testDecryptKeystoreDefinitions(cacheFilePath?: string): void {
69+
describe("disabled thread pool", () => {
70+
testDecryptKeystoreDefinitions({disableThreadPool: true});
71+
});
72+
73+
function testDecryptKeystoreDefinitions(opts?: Partial<KeystoreDecryptOptions>): void {
6774
it("decrypt keystores", async () => {
68-
const signers = await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath});
75+
const signers = await decryptKeystoreDefinitions(definitions, {logger: console, signal, ...opts});
6976
expect(signers.length).toBe(secretKeys.length);
7077
for (const signer of signers) {
7178
const hexSecret = signer.secretKey.toHex();
@@ -75,22 +82,22 @@ describe("decryptKeystoreDefinitions", () => {
7582
});
7683

7784
it("fail to decrypt keystores if lockfiles already exist", async () => {
78-
await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath});
85+
await decryptKeystoreDefinitions(definitions, {logger: console, signal, ...opts});
7986
// lockfiles should exist after the first run
8087

8188
try {
82-
await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath});
89+
await decryptKeystoreDefinitions(definitions, {logger: console, signal, ...opts});
8390
expect.fail("Second decrypt should fail due to failure to get lockfile");
8491
} catch (e) {
8592
expect((e as LockfileError).code).toBe<LockfileError["code"]>("ELOCKED");
8693
}
8794
});
8895

8996
it("decrypt keystores if lockfiles already exist if ignoreLockFile=true", async () => {
90-
await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath});
97+
await decryptKeystoreDefinitions(definitions, {logger: console, signal, ...opts});
9198
// lockfiles should exist after the first run
9299

93-
await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath, ignoreLockFile: true});
100+
await decryptKeystoreDefinitions(definitions, {logger: console, signal, ...opts, ignoreLockFile: true});
94101
});
95102
}
96103
});

0 commit comments

Comments
 (0)