Skip to content

Commit

Permalink
fix: prevent unnecessary encode/decode in grpc APIs (#1969)
Browse files Browse the repository at this point in the history
## Motivation

Prevent unnecessarily encoding/decoding messages when sending to/from
APIs

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [X] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [X] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [ ] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.
- [X] All [commits have been
signed](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#22-signing-commits)

<!-- start pr-codex -->

---

## PR-Codex overview
This PR focuses on optimizing RPC APIs and message handling in the
`hubble` app.

### Detailed summary
- Optimized `get_many_messages` to return message bytes instead of
decoded messages
- Updated message handling in various store modules to work with message
bytes
- Replaced unnecessary encode/decode operations in RPC APIs

> The following files were skipped due to too many changes:
`apps/hubble/src/addon/src/store/message.rs`,
`apps/hubble/src/storage/stores/verificationStore.ts`,
`apps/hubble/src/storage/stores/reactionStore.ts`,
`apps/hubble/src/storage/stores/castStore.ts`,
`apps/hubble/src/storage/stores/linkStore.ts`

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
adityapk00 authored May 1, 2024
1 parent 6bec999 commit 93de5d7
Show file tree
Hide file tree
Showing 18 changed files with 75 additions and 84 deletions.
5 changes: 5 additions & 0 deletions .changeset/lemon-olives-change.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

fix: Prevent unnecessary decode/encode in rpc APIs
14 changes: 8 additions & 6 deletions apps/hubble/src/addon/src/store/cast_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ impl CastStore {
fid: u32,
page_options: &PageOptions,
) -> Result<MessagesPage, HubError> {
store.get_adds_by_fid(fid, page_options, Some(|_: &Message| true))
store.get_adds_by_fid::<fn(&protos::Message) -> bool>(fid, page_options, None)
}

pub fn js_create_cast_store(mut cx: FunctionContext) -> JsResult<JsBox<Arc<Store>>> {
Expand Down Expand Up @@ -539,7 +539,7 @@ impl CastStore {
fid: u32,
page_options: &PageOptions,
) -> Result<MessagesPage, HubError> {
store.get_removes_by_fid(fid, page_options, Some(|_: &Message| true))
store.get_removes_by_fid::<fn(&protos::Message) -> bool>(fid, page_options, None)
}

pub fn js_get_cast_removes_by_fid(mut cx: FunctionContext) -> JsResult<JsPromise> {
Expand Down Expand Up @@ -594,15 +594,16 @@ impl CastStore {
Ok(false) // Continue iterating
})?;

let messages = message::get_many_messages(store.db().borrow(), message_keys)?;
let messages_bytes =
message::get_many_messages_as_bytes(store.db().borrow(), message_keys)?;
let next_page_token = if last_key.len() > 0 {
Some(last_key[prefix.len()..].to_vec())
} else {
None
};

Ok(MessagesPage {
messages,
messages_bytes,
next_page_token,
})
}
Expand Down Expand Up @@ -685,15 +686,16 @@ impl CastStore {
Ok(false) // Continue iterating
})?;

let messages = message::get_many_messages(store.db().borrow(), message_keys)?;
let messages_bytes =
message::get_many_messages_as_bytes(store.db().borrow(), message_keys)?;
let next_page_token = if last_key.len() > 0 {
Some(last_key[prefix.len()..].to_vec())
} else {
None
};

Ok(MessagesPage {
messages,
messages_bytes,
next_page_token,
})
}
Expand Down
5 changes: 3 additions & 2 deletions apps/hubble/src/addon/src/store/link_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,16 @@ impl LinkStore {
Ok(false)
})?;

let messages = message::get_many_messages(store.db().borrow(), message_keys)?;
let messages_bytes =
message::get_many_messages_as_bytes(store.db().borrow(), message_keys)?;
let next_page_token = if last_key.len() > 0 {
Some(last_key[prefix.len()..].to_vec())
} else {
None
};

Ok(MessagesPage {
messages,
messages_bytes,
next_page_token,
})
}
Expand Down
29 changes: 9 additions & 20 deletions apps/hubble/src/addon/src/store/message.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{store::HubError, PageOptions, PAGE_SIZE_MAX};
use crate::{
db::{RocksDB, RocksDbTransactionBatch},
protos::{self, CastId, Message as MessageProto, MessageData, MessageType},
protos::{CastId, Message as MessageProto, MessageData, MessageType},
};
use prost::Message as _;
use std::convert::TryFrom;
Expand Down Expand Up @@ -132,7 +132,7 @@ impl UserPostfix {

/** A page of messages returned from various APIs */
pub struct MessagesPage {
pub messages: Vec<MessageProto>,
pub messages_bytes: Vec<Vec<u8>>,
pub next_page_token: Option<Vec<u8>>,
}

Expand Down Expand Up @@ -296,27 +296,16 @@ pub fn get_message(
/** Read many messages.
* Note that if a message is not found, that corresponding entry in the result will be None.
* This is different from the behaviour of get_message, which returns an error.
*
*/
pub fn get_many_messages(
pub fn get_many_messages_as_bytes(
db: &RocksDB,
primary_keys: Vec<Vec<u8>>,
) -> Result<Vec<protos::Message>, HubError> {
) -> Result<Vec<Vec<u8>>, HubError> {
let mut messages = Vec::new();

for key in primary_keys {
if let Ok(Some(value)) = db.get(&key) {
match message_decode(value.as_slice()) {
Ok(message) => {
messages.push(message);
}
Err(_) => {
return Err(HubError {
code: "db.internal_error".to_string(),
message: "could not decode message".to_string(),
})
}
}
messages.push(value);
} else {
return Err(HubError {
code: "db.internal_error".to_string(),
Expand All @@ -337,16 +326,16 @@ pub fn get_messages_page_by_prefix<F>(
where
F: Fn(&MessageProto) -> bool,
{
let mut messages = Vec::new();
let mut messages_bytes = Vec::new();
let mut last_key = vec![];

db.for_each_iterator_by_prefix(prefix, page_options, |key, value| {
match message_decode(value) {
Ok(message) => {
if filter(&message) {
messages.push(message);
messages_bytes.push(value.to_vec());

if messages.len() >= page_options.page_size.unwrap_or(PAGE_SIZE_MAX) {
if messages_bytes.len() >= page_options.page_size.unwrap_or(PAGE_SIZE_MAX) {
last_key = key.to_vec();
return Ok(true); // Stop iterating
}
Expand All @@ -368,7 +357,7 @@ where
};

Ok(MessagesPage {
messages,
messages_bytes,
next_page_token,
})
}
Expand Down
5 changes: 3 additions & 2 deletions apps/hubble/src/addon/src/store/reaction_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,15 +571,16 @@ impl ReactionStore {
Ok(false) // Continue iterating
})?;

let messages = message::get_many_messages(store.db().borrow(), message_keys)?;
let messages_bytes =
message::get_many_messages_as_bytes(store.db().borrow(), message_keys)?;
let next_page_token = if last_key.len() > 0 {
Some(last_key[prefix.len()..].to_vec())
} else {
None
};

Ok(MessagesPage {
messages,
messages_bytes,
next_page_token,
})
}
Expand Down
2 changes: 1 addition & 1 deletion apps/hubble/src/addon/src/store/user_data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl UserDataStore {
fid: u32,
page_options: &PageOptions,
) -> Result<MessagesPage, HubError> {
store.get_adds_by_fid(fid, page_options, Some(|_message: &Message| true))
store.get_adds_by_fid::<fn(&protos::Message) -> bool>(fid, page_options, None)
}

pub fn js_get_user_data_adds_by_fid(mut cx: FunctionContext) -> JsResult<JsPromise> {
Expand Down
2 changes: 1 addition & 1 deletion apps/hubble/src/addon/src/store/username_proof_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ impl UsernameProofStore {
fid: u32,
page_options: &PageOptions,
) -> Result<MessagesPage, HubError> {
store.get_adds_by_fid(fid, page_options, Some(|_message: &Message| true))
store.get_adds_by_fid::<fn(&protos::Message) -> bool>(fid, page_options, None)
}

pub fn js_get_username_proofs_by_fid(mut cx: FunctionContext) -> JsResult<JsPromise> {
Expand Down
7 changes: 2 additions & 5 deletions apps/hubble/src/addon/src/store/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use neon::{
buffer::TypedArray, Deferred, JsArray, JsBoolean, JsBox, JsBuffer, JsNumber, JsObject,
},
};
use prost::Message as _;
use std::{borrow::Borrow, sync::Arc};

/**
Expand Down Expand Up @@ -101,10 +100,8 @@ pub fn encode_messages_to_js_object<'a>(
cx: &mut TaskContext<'a>,
messages_page: MessagesPage,
) -> JsResult<'a, JsObject> {
let js_messages = JsArray::new(cx, messages_page.messages.len());
for (i, message) in messages_page.messages.iter().enumerate() {
let message_bytes = message.encode_to_vec();

let js_messages = JsArray::new(cx, messages_page.messages_bytes.len());
for (i, message_bytes) in messages_page.messages_bytes.iter().enumerate() {
let mut js_buffer = cx.buffer(message_bytes.len())?;
js_buffer.as_mut_slice(cx).copy_from_slice(&message_bytes);
js_messages.set(cx, i as u32, js_buffer)?;
Expand Down
4 changes: 2 additions & 2 deletions apps/hubble/src/addon/src/store/verification_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ impl VerificationStore {
fid: u32,
page_options: &PageOptions,
) -> Result<MessagesPage, HubError> {
store.get_adds_by_fid(fid, page_options, Some(|_message: &Message| true))
store.get_adds_by_fid::<fn(&protos::Message) -> bool>(fid, page_options, None)
}

pub fn js_get_verification_adds_by_fid(mut cx: FunctionContext) -> JsResult<JsPromise> {
Expand Down Expand Up @@ -460,7 +460,7 @@ impl VerificationStore {
fid: u32,
page_options: &PageOptions,
) -> Result<MessagesPage, HubError> {
store.get_removes_by_fid(fid, page_options, Some(|_message: &Message| true))
store.get_removes_by_fid::<fn(&protos::Message) -> bool>(fid, page_options, None)
}

pub fn js_get_verification_removes_by_fid(mut cx: FunctionContext) -> JsResult<JsPromise> {
Expand Down
3 changes: 3 additions & 0 deletions apps/hubble/src/network/sync/multiPeerSyncEngine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,9 @@ describe("Multi peer sync engine", () => {
await engine2.mergeOnChainEvent(signerEvent);
await engine2.mergeOnChainEvent(storageEvent);

await sleepWhile(() => syncEngine1.syncTrieQSize > 0, SLEEPWHILE_TIMEOUT);
await sleepWhile(() => syncEngine2.syncTrieQSize > 0, SLEEPWHILE_TIMEOUT);

expect(await syncEngine1.trie.items()).toEqual(await syncEngine2.trie.items());
expect(await syncEngine1.trie.rootHash()).toEqual(await syncEngine2.trie.rootHash());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ export class ValidateOrRevokeMessagesJobScheduler {
// Throttle the job.
// We run at the rate of 50 fids per second. If we are running ahead of schedule, we sleep to catch up
if (fid % 100 === 0) {
const allotedTimeMs = fid * TIME_SCHEDULED_PER_FID_MS;
const allotedTimeMs = (fid - lastFid) * TIME_SCHEDULED_PER_FID_MS;
const elapsedTimeMs = Date.now() - start;
if (allotedTimeMs > elapsedTimeMs) {
const sleepTimeMs = allotedTimeMs - elapsedTimeMs;
Expand Down
22 changes: 8 additions & 14 deletions apps/hubble/src/storage/stores/castStore.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
import {
CastAddMessage,
CastId,
CastRemoveMessage,
getDefaultStoreLimit,
StoreType,
Message,
} from "@farcaster/hub-nodejs";
import { CastAddMessage, CastId, CastRemoveMessage, getDefaultStoreLimit, StoreType } from "@farcaster/hub-nodejs";
import { ResultAsync } from "neverthrow";
import RocksDB from "../db/rocksdb.js";
import { UserPostfix } from "../db/types.js";
Expand All @@ -22,6 +15,7 @@ import {
rsGetCastsByParent,
rustErrorToHubError,
} from "../../rustfunctions.js";
import { messageDecode } from "../../storage/db/message.js";

class CastStore extends RustStoreBase<CastAddMessage, CastRemoveMessage> {
constructor(db: RocksDB, eventHandler: StoreEventHandler, options: StorePruneOptions = {}) {
Expand All @@ -38,7 +32,7 @@ class CastStore extends RustStoreBase<CastAddMessage, CastRemoveMessage> {
if (result.isErr()) {
throw result.error;
}
return Message.decode(new Uint8Array(result.value)) as CastAddMessage;
return messageDecode(new Uint8Array(result.value)) as CastAddMessage;
}

/** Looks up CastRemove message by cast tsHash */
Expand All @@ -48,7 +42,7 @@ class CastStore extends RustStoreBase<CastAddMessage, CastRemoveMessage> {
if (result.isErr()) {
throw result.error;
}
return Message.decode(new Uint8Array(result.value)) as CastRemoveMessage;
return messageDecode(new Uint8Array(result.value)) as CastRemoveMessage;
}

/** Gets all CastAdd messages for an fid */
Expand All @@ -57,7 +51,7 @@ class CastStore extends RustStoreBase<CastAddMessage, CastRemoveMessage> {

const messages =
messages_page.messageBytes?.map((message_bytes) => {
return Message.decode(new Uint8Array(message_bytes)) as CastAddMessage;
return messageDecode(new Uint8Array(message_bytes)) as CastAddMessage;
}) ?? [];

return { messages, nextPageToken: messages_page.nextPageToken };
Expand All @@ -69,7 +63,7 @@ class CastStore extends RustStoreBase<CastAddMessage, CastRemoveMessage> {

const messages =
message_page.messageBytes?.map((message_bytes) => {
return Message.decode(new Uint8Array(message_bytes)) as CastRemoveMessage;
return messageDecode(new Uint8Array(message_bytes)) as CastRemoveMessage;
}) ?? [];

return { messages, nextPageToken: message_page.nextPageToken };
Expand Down Expand Up @@ -100,7 +94,7 @@ class CastStore extends RustStoreBase<CastAddMessage, CastRemoveMessage> {

const messages =
message_page.messageBytes?.map((message_bytes) => {
return Message.decode(new Uint8Array(message_bytes)) as CastAddMessage;
return messageDecode(new Uint8Array(message_bytes)) as CastAddMessage;
}) ?? [];

return { messages, nextPageToken: message_page.nextPageToken };
Expand All @@ -112,7 +106,7 @@ class CastStore extends RustStoreBase<CastAddMessage, CastRemoveMessage> {

const messages =
message_page.messageBytes?.map((message_bytes) => {
return Message.decode(new Uint8Array(message_bytes)) as CastAddMessage;
return messageDecode(new Uint8Array(message_bytes)) as CastAddMessage;
}) ?? [];

return { messages, nextPageToken: message_page.nextPageToken };
Expand Down
Loading

0 comments on commit 93de5d7

Please sign in to comment.