Skip to content

Commit

Permalink
fix(hubble): check reachable gRPC for hubs (#1940)
Browse files Browse the repository at this point in the history
## Motivation

- With the new and improved [diff sync
algorithm](#1907), hubs
connect to multiple peers to sync latest state
- Hubs with no gRPC port open risk delaying sync for peers, and create
network imbalance for gRPC requests
- Explicit port checks may improve health at the network, at the cost of
higher operator effort to ensure proper network configuration

## Change Summary

- Add startup check for hub to verify gRPC port is reachable from public
internet. Reachable address is required for hub to perform diff sync via
gRPC API and sync with the network. Hub operators may need to enable
port-forwarding of traffic to hub's host and port if they are behind a
NAT.
- Startup check emits warning for now, but may be enforced in the
future.

### Notes
- There is room for improvement here. 
  - We could consolidate network traffic to funnel through p2p port. 
- Reachability check could leverage bootstrap peers instead of third
party website `ipify.org`
- We perform reachability check on startup but don't re-evaluate for
running process
  - Additional followup work will be done to iterate on solution

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.
- [x] All [commits have been
signed](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#22-signing-commits)

## Additional Context

If this is a relatively large or complex change, provide more details
here that will help reviewers

<!-- start pr-codex -->

---

## PR-Codex overview
This PR enhances the `@farcaster/hubble` module by adding a startup
check for the hub to verify the gRPC port's reachability from the public
internet.

### Detailed summary
- Added a startup check for the hub to verify gRPC port reachability
- Introduced `checkPort` function to verify open/closed ports
- Updated `getPublicIp` function to handle different formats
- Modified error handling for network failures and timeouts

> The following files were skipped due to too many changes:
`apps/hubble/src/hubble.ts`

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
Wazzymandias authored Apr 22, 2024
1 parent e3afd5c commit 45cf3f4
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 35 deletions.
5 changes: 5 additions & 0 deletions .changeset/silly-bobcats-yell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

fix(hubble): Add startup check for hub to verify gRPC port is reachable from public internet. Reachable address is required for hub to perform diff sync via gRPC API and sync with the network. Hub operators may need to enable port-forwarding of traffic to hub's host and port if they are behind a NAT. Startup check emits warning for now, but may be enforced in the future.
67 changes: 48 additions & 19 deletions apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,38 @@
import {
bytesToHexString,
bytesToUtf8String,
ClientOptions,
ContactInfoContent,
ContactInfoContentBody,
FarcasterNetwork,
getInsecureHubRpcClient,
getSSLHubRpcClient,
GossipAddressInfo,
GossipMessage,
HubState,
Message,
HashScheme,
HubAsyncResult,
HubError,
bytesToHexString,
bytesToUtf8String,
HubRpcClient,
getSSLHubRpcClient,
getInsecureHubRpcClient,
UserNameProof,
HubState,
Message,
OnChainEvent,
onChainEventTypeToJSON,
ClientOptions,
UserNameProof,
validations,
HashScheme,
} from "@farcaster/hub-nodejs";
import { PeerId } from "@libp2p/interface-peer-id";
import { peerIdFromBytes, peerIdFromString } from "@libp2p/peer-id";
import { publicAddressesFirst } from "@libp2p/utils/address-sort";
import { unmarshalPrivateKey, unmarshalPublicKey } from "@libp2p/crypto/keys";
import { Multiaddr, multiaddr } from "@multiformats/multiaddr";
import { Result, ResultAsync, err, ok } from "neverthrow";
import { GossipNode, MAX_MESSAGE_QUEUE_SIZE, GOSSIP_SEEN_TTL } from "./network/p2p/gossipNode.js";
import { err, ok, Result, ResultAsync } from "neverthrow";
import { GOSSIP_SEEN_TTL, GossipNode, MAX_MESSAGE_QUEUE_SIZE } from "./network/p2p/gossipNode.js";
import { PeriodicSyncJobScheduler } from "./network/sync/periodicSyncJob.js";
import SyncEngine, { FIRST_SYNC_DELAY } from "./network/sync/syncEngine.js";
import AdminServer from "./rpc/adminServer.js";
import Server from "./rpc/server.js";
import Server, { checkPortAndPublicAddress, DEFAULT_SERVER_INTERNET_ADDRESS_IPV4 } from "./rpc/server.js";
import { getHubState, putHubState } from "./storage/db/hubState.js";
import RocksDB, { DB_DIRECTORY } from "./storage/db/rocksdb.js";
import RocksDB from "./storage/db/rocksdb.js";
import { RootPrefix } from "./storage/db/types.js";
import Engine from "./storage/engine/index.js";
import { PruneEventsJobScheduler } from "./storage/jobs/pruneEventsJob.js";
Expand All @@ -42,11 +42,11 @@ import { rsDbDestroy, rsValidationMethods } from "./rustfunctions.js";
import * as tar from "tar";
import * as zlib from "zlib";
import {
SubmitMessageSuccessLogCache,
logger,
messageToLog,
messageTypeToName,
onChainEventToLog,
SubmitMessageSuccessLogCache,
usernameProofToLog,
} from "./utils/logger.js";
import {
Expand All @@ -56,7 +56,7 @@ import {
ipFamilyToString,
p2pMultiAddrStr,
} from "./utils/p2p.js";
import { fetchSnapshotMetadata, SnapshotMetadata, snapshotURL, uploadToS3 } from "./utils/snapshot.js";
import { fetchSnapshotMetadata, SnapshotMetadata, snapshotURL } from "./utils/snapshot.js";
import { PeriodicTestDataJobScheduler, TestUser } from "./utils/periodicTestDataJob.js";
import { ensureAboveMinFarcasterVersion, getMinFarcasterVersion, VersionSchedule } from "./utils/versions.js";
import { CheckFarcasterVersionJobScheduler } from "./storage/jobs/checkFarcasterVersionJob.js";
Expand All @@ -71,7 +71,7 @@ import { createPublicClient, fallback, http } from "viem";
import { mainnet, optimism } from "viem/chains";
import { AddrInfo } from "@chainsafe/libp2p-gossipsub/types";
import { CheckIncomingPortsJobScheduler } from "./storage/jobs/checkIncomingPortsJob.js";
import { NetworkConfig, applyNetworkConfig, fetchNetworkConfig } from "./network/utils/networkConfig.js";
import { applyNetworkConfig, fetchNetworkConfig, NetworkConfig } from "./network/utils/networkConfig.js";
import { UpdateNetworkConfigJobScheduler } from "./storage/jobs/updateNetworkConfigJob.js";
import { DbSnapshotBackupJobScheduler } from "./storage/jobs/dbSnapshotBackupJob.js";
import { statsd, StatsDInitParams } from "./utils/statsd.js";
Expand All @@ -89,11 +89,11 @@ import { SingleBar } from "cli-progress";
import { exportToProtobuf } from "@libp2p/peer-id-factory";
import OnChainEventStore from "./storage/stores/onChainEventStore.js";
import { ensureMessageData, isMessageInDB } from "./storage/db/message.js";
import { HubResult, MessageBundle, getFarcasterTime } from "@farcaster/core";
import { getFarcasterTime, HubResult, MessageBundle } from "@farcaster/core";
import { MerkleTrie } from "./network/sync/merkleTrie.js";
import { DEFAULT_CATCHUP_SYNC_SNAPSHOT_MESSAGE_LIMIT } from "./defaultConfig.js";
import { diagnosticReporter } from "./utils/diagnosticReport.js";
import v8 from "v8";
import { startupCheck, StartupCheckStatus } from "./utils/startupCheck.js";

export type HubSubmitSource = "gossip" | "rpc" | "eth-provider" | "l2-provider" | "sync" | "fname-registry";

Expand Down Expand Up @@ -557,7 +557,7 @@ export class Hub implements HubInterface {
async start() {
// See if we have to fetch the IP address
if (!this.options.announceIp || this.options.announceIp.trim().length === 0) {
const ipResult = await getPublicIp();
const ipResult = await getPublicIp("text");
if (ipResult.isErr()) {
log.error({ error: ipResult.error }, `failed to fetch public IP address, using ${this.options.ipMultiAddr}`);
} else {
Expand Down Expand Up @@ -708,6 +708,35 @@ export class Hub implements HubInterface {

// Start the RPC server
await this.rpcServer.start(this.options.rpcServerHost, this.options.rpcPort ?? 0);
const rpcPort = this.rpcServer.listenPort;
const rpcAddressCheck = await checkPortAndPublicAddress(
this.options.rpcServerHost ?? DEFAULT_SERVER_INTERNET_ADDRESS_IPV4,
rpcPort,
this.options.announceIp ?? undefined,
);
if (rpcAddressCheck.isErr()) {
const errorMessage = `Error validating RPC address at port ${this.options.rpcPort}.
Please make sure RPC port value is valid and reachable from public internet.
Reachable address is required for hub to perform diff sync via gRPC API and sync with the network.
Hub operators may need to enable port-forwarding of traffic to hub's host and port if they are behind a NAT.
`;
log.warn(
{
rpc_port: rpcPort,
local_address: this.options.rpcServerHost,
...(this.options.announceIp && { public_ip: this.options.announceIp }),
},
errorMessage,
);
// NOTE(wazzymandias): startup check is performed in hub start rather than cli.ts because rpc server port
// may change if initialized with zero value. We don't know correct rpc port until server starts, and hub start
// is blocking synchronous operation. In general startup checks should stay within cli.ts as much as possible.
startupCheck.printStartupCheckStatus(StartupCheckStatus.WARNING, errorMessage);
// NOTE(wazzymandias): For now, we will not throw error here, in order to give hub operators enough time
// to configure their network settings. We will throw error in the future.
// throw new HubError("unavailable.network_failure", errorMessage);
}

if (!this.options.httpServerDisabled) {
await this.httpApiServer.start(this.options.rpcServerHost, this.options.httpApiPort ?? 0);
} else {
Expand Down
104 changes: 100 additions & 4 deletions apps/hubble/src/rpc/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,17 @@ import {
SignerOnChainEvent,
OnChainEvent,
HubResult,
HubAsyncResult,
} from "@farcaster/hub-nodejs";
import { err, ok, Result } from "neverthrow";
import { err, ok, Result, ResultAsync } from "neverthrow";
import { APP_NICKNAME, APP_VERSION, HubInterface } from "../hubble.js";
import { GossipNode } from "../network/p2p/gossipNode.js";
import { NodeMetadata } from "../network/sync/merkleTrie.js";
import SyncEngine from "../network/sync/syncEngine.js";
import Engine from "../storage/engine/index.js";
import { MessagesPage } from "../storage/stores/types.js";
import { logger } from "../utils/logger.js";
import { addressInfoFromParts, extractIPAddress } from "../utils/p2p.js";
import { addressInfoFromParts, extractIPAddress, getPublicIp } from "../utils/p2p.js";
import { RateLimiterMemory } from "rate-limiter-flexible";
import {
BufferedStreamWriter,
Expand All @@ -59,13 +60,17 @@ import { sleep } from "../utils/crypto.js";
import { SUBMIT_MESSAGE_RATE_LIMIT, rateLimitByIp } from "../utils/rateLimits.js";
import { statsd } from "../utils/statsd.js";
import { SyncId } from "../network/sync/syncId.js";
import { AddressInfo } from "net";
import * as net from "node:net";
import axios from "axios";
import { fidFromEvent } from "../storage/stores/storeEventHandler.js";

const HUBEVENTS_READER_TIMEOUT = 1 * 60 * 60 * 1000; // 1 hour

export const DEFAULT_SUBSCRIBE_PERIP_LIMIT = 4; // Max 4 subscriptions per IP
export const DEFAULT_SUBSCRIBE_GLOBAL_LIMIT = 4096; // Max 4096 subscriptions globally
const MAX_EVENT_STREAM_SHARDS = 10;
export const DEFAULT_SERVER_INTERNET_ADDRESS_IPV4 = "0.0.0.0";

export type RpcUsers = Map<string, string[]>;

Expand Down Expand Up @@ -106,6 +111,97 @@ export const authenticateUser = (metadata: Metadata, rpcUsers: RpcUsers): HubRes
return err(new HubError("unauthenticated", "No authorization header"));
};

async function retryAsyncOperation<T>(
operation: () => HubAsyncResult<T>,
retries = 3,
delayMs = 1000,
): HubAsyncResult<T> {
const attempt = async (remainingRetries: number, delayMs: number): HubAsyncResult<T> => {
const result = await operation();
if (result.isErr()) {
if (remainingRetries > 0) {
await sleep(delayMs);
return attempt(remainingRetries - 1, delayMs * 2);
}

return err(result.error);
}

return ok(result.value);
};
return attempt(retries, delayMs);
}

export async function checkPort(ip: string, port: number): HubAsyncResult<void> {
if (ip === "") {
return err(new HubError("bad_request.invalid_param", "Invalid ip address"));
}

if (port === 0) {
return err(new HubError("bad_request.invalid_param", "Invalid port"));
}

return ResultAsync.fromPromise(
new Promise<void>((resolve, reject) => {
const socket = new net.Socket();
const socketTimeoutMs = 2000; // 2 seconds

socket.setTimeout(socketTimeoutMs);
socket
.once("connect", () => {
socket.destroy();
resolve();
})
.once("error", (err) => {
socket.destroy();
reject(err);
})
.once("timeout", () => {
socket.destroy();
reject(new HubError("unavailable.network_failure", `Timeout connecting to ${ip}:${port}`));
})
.connect(port, ip);
}),
(error) => {
return new HubError("unavailable.network_failure", `Failed to connect to ${ip}:${port}: ${error}`);
},
).match(
async (okResult: void): HubAsyncResult<void> => ok(okResult),
async (errorResult: HubError): HubAsyncResult<void> => err(errorResult),
);
}

export const checkPortAndPublicAddress = async (
localIP: string,
port: number,
remoteIP?: string,
): HubAsyncResult<void> => {
const retryCount = 3;
const localDelayMs = 50;
const localResult: Result<void, Error> = await retryAsyncOperation<void>(
() => checkPort(localIP, port),
retryCount,
localDelayMs, // local ping does not need high timeout
);

if (localResult.isErr()) {
return err(
new HubError("unavailable.network_failure", `Failed to connect to ${localIP}:${port}: ${localResult.error}`),
);
}

let publicIP: string = remoteIP ?? "";
if (publicIP === "") {
const publicIPResponse = await getPublicIp("json");
if (publicIPResponse.isErr()) {
return err(publicIPResponse.error);
}
publicIP = publicIPResponse.value;
}

return await retryAsyncOperation<void>(() => checkPort(publicIP, port), retryCount);
};

export const toServiceError = (err: HubError): ServiceError => {
let grpcCode: number;
if (err.errCode === "unauthenticated") {
Expand Down Expand Up @@ -285,7 +381,7 @@ export default class Server {
);
}

async start(ip = "0.0.0.0", port = 0): Promise<number> {
async start(ip = DEFAULT_SERVER_INTERNET_ADDRESS_IPV4, port = 0): Promise<number> {
return new Promise((resolve, reject) => {
this.grpcServer.bindAsync(`${ip}:${port}`, ServerCredentials.createInsecure(), (err, port) => {
if (err) {
Expand Down Expand Up @@ -324,7 +420,7 @@ export default class Server {
});
}

get address() {
get address(): HubResult<AddressInfo> {
const addr = addressInfoFromParts(this.listenIp, this.port);
return addr;
}
Expand Down
45 changes: 43 additions & 2 deletions apps/hubble/src/rpc/test/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ import {
StorageLimitsResponse,
StoreType,
getDefaultStoreLimit,
HubError,
} from "@farcaster/hub-nodejs";
import Engine from "../../storage/engine/index.js";
import { MockHub } from "../../test/mocks.js";
import Server from "../server.js";
import Server, { checkPort } from "../server.js";
import SyncEngine from "../../network/sync/syncEngine.js";
import { Ok } from "neverthrow";
import { Err, Ok } from "neverthrow";
import { sleep } from "../../utils/crypto.js";
import * as http from "http";
import { AddressInfo, createServer } from "net";

const db = jestRocksDB("protobufs.rpc.server.test");
const network = FarcasterNetwork.TESTNET;
Expand Down Expand Up @@ -214,3 +217,41 @@ describe("server rpc tests", () => {
});
});
});

describe("checkPort", () => {
let server: http.Server;
const testPort = 3000; // Example port

beforeAll((done) => {
server = http.createServer((req, res) => {
res.writeHead(200);
res.end("Test Server");
});

server.listen(testPort, done);
});

afterAll((done) => {
server.close(done);
});

it("should verify the port is open", async () => {
expect(await checkPort("127.0.0.1", testPort)).toBeInstanceOf(Ok<void, HubError>);
});

it("should verify the port is closed", async () => {
const findAvailablePort = () =>
new Promise<number>((resolve, reject) => {
const server = createServer();
server.unref();
server.listen(0, () => {
const port = (server.address() as AddressInfo).port;
server.close(() => resolve(port));
});
server.on("error", reject);
});

const closedPort = await findAvailablePort();
expect(await checkPort("127.0.0.1", closedPort)).toBeInstanceOf(Err<void, HubError>);
});
});
Loading

0 comments on commit 45cf3f4

Please sign in to comment.