Skip to content

Commit

Permalink
add max_receiver/max_senders to rpc
Browse files Browse the repository at this point in the history
Signed-off-by: Shumo Chu <shumo.chu@protonmail.com>
  • Loading branch information
stechu committed Jun 23, 2022
1 parent 9316c1f commit e403539
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 45 deletions.
50 changes: 31 additions & 19 deletions pallets/manta-pay/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,26 +464,33 @@ pub mod pallet {
T: Config,
{
/// Maximum Number of Updates per Shard
const PULL_MAX_RECEIVER_UPDATE_SIZE: usize = 4096;
const PULL_MAX_RECEIVER_UPDATE_SIZE: u64 = 32768;

/// Maximum Size of Sender Data Update
const PULL_MAX_SENDER_UPDATE_SIZE: usize = 4096;
const PULL_MAX_SENDER_UPDATE_SIZE: u64 = 32768;

/// Pulls receiver data from the ledger starting at the `receiver_index`.
#[inline]
fn pull_receivers(receiver_index: [usize; 256]) -> (bool, ReceiverChunk) {
fn pull_receivers(receiver_index: [usize; 256], max_update_request: u64) -> (bool, ReceiverChunk) {
let mut more_receivers = false;
let mut receivers = Vec::new();
let mut receivers_pulled: usize = 0;
let mut receivers_pulled: u64 = 0;
let max_update = if max_update_request > Self::PULL_MAX_RECEIVER_UPDATE_SIZE {
Self::PULL_MAX_RECEIVER_UPDATE_SIZE
} else {
max_update_request
};

for (i, index) in receiver_index.into_iter().enumerate() {
more_receivers |= Self::pull_receivers_for_shard(
i as u8,
index,
max_update,
&mut receivers,
&mut receivers_pulled,
);
// if max capacity is reached and there is more to pull, then we return
if receivers_pulled == Self::PULL_MAX_RECEIVER_UPDATE_SIZE && more_receivers {
if receivers_pulled == max_update && more_receivers {
break;
}
}
Expand All @@ -496,32 +503,37 @@ pub mod pallet {
fn pull_receivers_for_shard(
shard_index: u8,
receiver_index: usize,
max_update: u64,
receivers: &mut ReceiverChunk,
receiver_pulled: &mut usize,
receiver_pulled: &mut u64,
) -> bool {
let max_receiver_index = receiver_index + Self::PULL_MAX_RECEIVER_UPDATE_SIZE;
for idx in receiver_index..max_receiver_index {
if *receiver_pulled == Self::PULL_MAX_RECEIVER_UPDATE_SIZE {
return Shards::<T>::contains_key(shard_index, idx as u64);
let max_receiver_index = (receiver_index as u64) + max_update;
for idx in (receiver_index as u64)..max_receiver_index {
if *receiver_pulled == max_update {
return Shards::<T>::contains_key(shard_index, idx);
}
match Shards::<T>::try_get(shard_index, idx as u64) {
match Shards::<T>::try_get(shard_index, idx) {
Ok(next) => {
*receiver_pulled += 1;
receivers.push(next);
}
_ => return false,
}
}
Shards::<T>::contains_key(shard_index, max_receiver_index as u64)
Shards::<T>::contains_key(shard_index, max_receiver_index)
}

/// Pulls sender data from the ledger starting at the `sender_index`.
#[inline]
fn pull_senders(sender_index: usize) -> (bool, SenderChunk) {
fn pull_senders(sender_index: usize, max_update_request: u64) -> (bool, SenderChunk) {
let mut senders = Vec::new();
let max_sender_index = sender_index + Self::PULL_MAX_SENDER_UPDATE_SIZE;
for idx in sender_index..max_sender_index {
match VoidNumberSetInsertionOrder::<T>::try_get(idx as u64) {
let max_sender_index = if max_update_request > Self::PULL_MAX_SENDER_UPDATE_SIZE {
(sender_index as u64) + Self::PULL_MAX_SENDER_UPDATE_SIZE
} else {
(sender_index as u64) + max_update_request
};
for idx in (sender_index as u64)..max_sender_index {
match VoidNumberSetInsertionOrder::<T>::try_get(idx) {
Ok(next) => senders.push(next),
_ => return (false, senders),
}
Expand All @@ -534,9 +546,9 @@ pub mod pallet {

/// Returns the diff of ledger state since the given `checkpoint`.
#[inline]
pub fn pull_ledger_diff(checkpoint: Checkpoint) -> PullResponse {
let (more_receivers, receivers) = Self::pull_receivers(*checkpoint.receiver_index);
let (more_senders, senders) = Self::pull_senders(checkpoint.sender_index);
pub fn pull_ledger_diff(checkpoint: Checkpoint, max_receivers: u64, max_senders: u64) -> PullResponse {
let (more_receivers, receivers) = Self::pull_receivers(*checkpoint.receiver_index, max_receivers);
let (more_senders, senders) = Self::pull_senders(checkpoint.sender_index, max_senders);
PullResponse {
should_continue: more_receivers || more_senders,
receivers,
Expand Down
6 changes: 3 additions & 3 deletions pallets/manta-pay/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub trait PullApi {
/// Returns the update required to be synchronized with the ledger starting from
/// `checkpoint`.
#[rpc(name = "mantaPay_pull_ledger_diff")]
fn pull_ledger_diff(&self, checkpoint: Checkpoint) -> Result<PullResponse>;
fn pull_ledger_diff(&self, checkpoint: Checkpoint, max_receivers: u64, max_senders: u64) -> Result<PullResponse>;
}

/// Pull RPC API Implementation
Expand Down Expand Up @@ -62,10 +62,10 @@ where
C::Api: PullLedgerDiffApi<B>,
{
#[inline]
fn pull_ledger_diff(&self, checkpoint: Checkpoint) -> Result<PullResponse> {
fn pull_ledger_diff(&self, checkpoint: Checkpoint, max_receivers: u64, max_senders: u64) -> Result<PullResponse> {
let api = self.client.runtime_api();
let at = BlockId::hash(self.client.info().best_hash);
api.pull_ledger_diff(&at, checkpoint.into())
api.pull_ledger_diff(&at, checkpoint.into(), max_receivers, max_senders)
.map_err(|err| Error {
code: ErrorCode::ServerError(1),
message: "Unable to compute state diff for pull".into(),
Expand Down
2 changes: 1 addition & 1 deletion pallets/manta-pay/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ use manta_pay::signer::RawCheckpoint;

sp_api::decl_runtime_apis! {
pub trait PullLedgerDiffApi {
fn pull_ledger_diff(checkpoint: RawCheckpoint) -> PullResponse;
fn pull_ledger_diff(checkpoint: RawCheckpoint, max_receivers: u64, max_senders: u64) -> PullResponse;
}
}
4 changes: 2 additions & 2 deletions runtime/dolphin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -881,8 +881,8 @@ impl_runtime_apis! {
}

impl pallet_manta_pay::runtime::PullLedgerDiffApi<Block> for Runtime {
fn pull_ledger_diff(checkpoint: pallet_manta_pay::RawCheckpoint) -> pallet_manta_pay::PullResponse {
MantaPay::pull_ledger_diff(checkpoint.into())
fn pull_ledger_diff(checkpoint: pallet_manta_pay::RawCheckpoint, max_receiver: u64, max_sender: u64) -> pallet_manta_pay::PullResponse {
MantaPay::pull_ledger_diff(checkpoint.into(), max_receiver, max_sender)
}
}

Expand Down
27 changes: 19 additions & 8 deletions tests/manta_pay_rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ function generate_utxo_data(per_shard_amount: number, checkpoint: Array<number>)
for(var shard_idx = 0; shard_idx < SHARD_NUMBER; ++ shard_idx) {
for(var utxo_idx = checkpoint[shard_idx]; utxo_idx < checkpoint[shard_idx] + per_shard_amount; ++ utxo_idx) {
const shards_storage_key = double_map_storage_key(
"MantaPay", "Shards", shard_idx, 8, HashType.Identity, utxo_idx, 64, HashType.Identity);
"MantaPay", "Shards", shard_idx, 8, HashType.TwoxConcat, utxo_idx, 64, HashType.TwoxConcat);
let value_str = u8aToHex(generate_utxo(shard_idx, utxo_idx));
data.push([shards_storage_key, value_str]);
}
Expand Down Expand Up @@ -106,7 +106,7 @@ function generate_vn_data(
var data = [];
for (var idx = start_index; idx < start_index + amount_per_batch; idx ++){
const vn_storage_key = single_map_storage_key(
"MantaPay", "VoidNumberSetInsertionOrder", idx, 64, HashType.Identity);
"MantaPay", "VoidNumberSetInsertionOrder", idx, 64, HashType.TwoxConcat);
data.push([vn_storage_key, u8aToHex(generate_void_number(idx))]);
}
return data;
Expand Down Expand Up @@ -195,11 +195,11 @@ async function setup_storage(api:ApiPromise, init_utxo_idx: number) {
const keyring = new Keyring({ type: 'sr25519' });
const sudo_key_pair = keyring.addFromMnemonic('bottom drive obey lake curtain smoke basket hold race lonely fit walk//Alice');

const utxo_big_batch_number = 1;
const utxo_big_batch_number = 2;
const utxo_batch_number = 4;
const utxo_per_shard = 16;
const vn_batch_number = 1;
const vn_batch_size = 1024;
const vn_batch_number = 8;
const vn_batch_size = 4096;

var receiver_checkpoint = new Array<number>(SHARD_NUMBER);
var check_idx = init_utxo_idx;
Expand All @@ -222,7 +222,9 @@ async function single_rpc_performance(api:ApiPromise) {
const before_rpc = performance.now();
var receiver_checkpoint = new Array<number>(SHARD_NUMBER);
receiver_checkpoint.fill(0);
const data = await (api.rpc as any).mantaPay.pull_ledger_diff({receiver_index: new Array<number>(SHARD_NUMBER).fill(0), sender_index: 0});
const data = await (api.rpc as any).mantaPay.pull_ledger_diff(
{receiver_index: new Array<number>(SHARD_NUMBER).fill(0), sender_index: 0},
BigInt(16384), BigInt(16384));
const after_rpc = performance.now();
console.log("ledger diff receiver size: %i", data.receivers.length);
console.log("ledger diff void number size: %i", data.senders.length);
Expand Down Expand Up @@ -264,18 +266,27 @@ async function main(){
{
name: 'checkpoint',
type: 'Checkpoint'
},
{
name: 'max_receiver',
type: 'u64'
},
{
name: 'max_sender',
type: 'u64'
}

],
type: 'PullResponse'
}
}
}});

//await setup_storage(api, 64);
await setup_storage(api, 0);
//const block_hash = await api.rpc.chain.getBlockHash()
//let shards: any[][] = await (api.query as any).mantaPay.shards.entriesAt(block_hash);
//console.log("shards size: %i", shards.length);
//await single_rpc_performance(api);
await single_rpc_performance(api);
const block_hash = await api.rpc.chain.getBlockHash();
var before = performance.now();
let shards = await ((await api.at(block_hash)).query as any).mantaPay.shards.entries();
Expand Down
4 changes: 2 additions & 2 deletions tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
"@polkadot/api": "^7.15.1",
"@polkadot/types": "^7.15.1",
"@types/yargs": "^17.0.0",
"minimist": "^1.2.5",
"ts-node": "^10.8.0",
"typescript": "4.6.4",
"minimist": "^1.2.5"
"typescript": "4.6.4"
}
}
28 changes: 18 additions & 10 deletions tests/test-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import type { HexString } from '@polkadot/util/types';
import { u8aToHex, u8aToBigInt, numberToU8a, nToU8a} from '@polkadot/util';

export enum HashType {
Identity
Identity,
TwoxConcat
};

// delay sometime (usually used for wait a block confirmation)
Expand All @@ -30,16 +31,25 @@ export function single_map_storage_key(
bitLength: number,
hash_type: HashType
) : String {
if(hash_type !== HashType.Identity){
throw Error("HashType can only be Identity");
}
let binary = new Uint8Array([
...twox_128(module_name),
...twox_128(variable_name),
...numberToU8a(key, bitLength).reverse()]);
...hash_number(key, bitLength, hash_type)]);
return u8aToHex(binary);
}

function hash_number(key: number, bit_length: number, hash_type: HashType): Uint8Array {
switch (hash_type) {
case HashType.Identity:
return numberToU8a(key, bit_length).reverse();
case HashType.TwoxConcat:
return new Uint8Array([
...xxhashAsU8a(numberToU8a(key, bit_length).reverse(), 64),
...numberToU8a(key, bit_length).reverse()
])
}
}

// only support Identity hash type for now
export function double_map_storage_key(
module_name: string,
Expand All @@ -51,14 +61,12 @@ export function double_map_storage_key(
bitLength_2: number,
hash_type_2: HashType
): String {
if(hash_type_1 !== HashType.Identity || hash_type_2 !== HashType.Identity){
throw Error("HashType can only be Identity");
}

let binary = new Uint8Array([
...twox_128(module_name),
...twox_128(variable_name),
...numberToU8a(key_1, bitLength_1).reverse(),
...numberToU8a(key_2, bitLength_2).reverse()
...hash_number(key_1, bitLength_1, hash_type_1),
...hash_number(key_2, bitLength_2, hash_type_2),
]);
return u8aToHex(binary);
}
Expand Down

0 comments on commit e403539

Please sign in to comment.