Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

master: Add nonce to shreds repairs, add shred data size to header #10109

Merged
merged 6 commits into from
May 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions core/benches/shredder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) {
let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD;
let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size;
// ~1Mb
let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64;
let num_ticks = max_ticks_per_n_shreds(1, Some(SIZE_OF_DATA_SHRED_PAYLOAD)) * num_shreds as u64;
let entries = create_ticks(num_ticks, 0, Hash::default());
bencher.iter(|| {
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone(), 0, 0).unwrap();
Expand All @@ -46,7 +46,11 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) {
let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD;
let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size;
let txs_per_entry = 128;
let num_entries = max_entries_per_n_shred(&make_test_entry(txs_per_entry), num_shreds as u64);
let num_entries = max_entries_per_n_shred(
&make_test_entry(txs_per_entry),
num_shreds as u64,
Some(shred_size),
);
let entries = make_large_unchained_entries(txs_per_entry, num_entries);
// 1Mb
bencher.iter(|| {
Expand All @@ -61,7 +65,7 @@ fn bench_deshredder(bencher: &mut Bencher) {
let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD;
// ~10Mb
let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size;
let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64;
let num_ticks = max_ticks_per_n_shreds(1, Some(shred_size)) * num_shreds as u64;
let entries = create_ticks(num_ticks, 0, Hash::default());
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp, 0, 0).unwrap();
let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0;
Expand Down
2 changes: 1 addition & 1 deletion core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ pub mod test {
Vec<TransmitShreds>,
Vec<TransmitShreds>,
) {
let num_entries = max_ticks_per_n_shreds(num);
let num_entries = max_ticks_per_n_shreds(num, None);
let (data_shreds, _) = make_slot_entries(slot, 0, num_entries);
let keypair = Arc::new(Keypair::new());
let shredder = Shredder::new(slot, 0, RECOMMENDED_FEC_RATE, keypair, 0, 0)
Expand Down
8 changes: 6 additions & 2 deletions core/src/broadcast_stage/standard_broadcast_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ mod test {
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(leader_info.info));
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut genesis_config = create_genesis_config(10_000).genesis_config;
genesis_config.ticks_per_slot = max_ticks_per_n_shreds(num_shreds_per_slot) + 1;
genesis_config.ticks_per_slot = max_ticks_per_n_shreds(num_shreds_per_slot, None) + 1;
let bank0 = Arc::new(Bank::new(&genesis_config));
(
blockstore,
Expand Down Expand Up @@ -537,7 +537,11 @@ mod test {
// Interrupting the slot should cause the unfinished_slot and stats to reset
let num_shreds = 1;
assert!(num_shreds < num_shreds_per_slot);
let ticks1 = create_ticks(max_ticks_per_n_shreds(num_shreds), 0, genesis_config.hash());
let ticks1 = create_ticks(
max_ticks_per_n_shreds(num_shreds, None),
0,
genesis_config.hash(),
);
let receive_results = ReceiveResults {
entries: ticks1.clone(),
time_elapsed: Duration::new(2, 0),
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub mod poh_recorder;
pub mod poh_service;
pub mod progress_map;
pub mod pubkey_references;
pub mod repair_response;
pub mod repair_service;
pub mod replay_stage;
mod result;
Expand Down
112 changes: 112 additions & 0 deletions core/src/repair_response.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use solana_ledger::{
blockstore::Blockstore,
shred::{Nonce, SIZE_OF_NONCE},
};
use solana_perf::packet::limited_deserialize;
use solana_sdk::{clock::Slot, packet::Packet};
use std::{io, net::SocketAddr};

pub fn repair_response_packet(
blockstore: &Blockstore,
slot: Slot,
shred_index: u64,
dest: &SocketAddr,
nonce: Nonce,
) -> Option<Packet> {
let shred = blockstore
.get_data_shred(slot, shred_index)
.expect("Blockstore could not get data shred");
shred
.map(|shred| repair_response_packet_from_shred(shred, dest, nonce))
.unwrap_or(None)
}

pub fn repair_response_packet_from_shred(
shred: Vec<u8>,
dest: &SocketAddr,
nonce: Nonce,
) -> Option<Packet> {
let mut packet = Packet::default();
packet.meta.size = shred.len() + SIZE_OF_NONCE;
if packet.meta.size > packet.data.len() {
return None;
}
packet.meta.set_addr(dest);
packet.data[..shred.len()].copy_from_slice(&shred);
let mut wr = io::Cursor::new(&mut packet.data[shred.len()..]);
bincode::serialize_into(&mut wr, &nonce).expect("Buffer not large enough to fit nonce");
Some(packet)
}

pub fn nonce(buf: &[u8]) -> Option<Nonce> {
if buf.len() < SIZE_OF_NONCE {
None
} else {
limited_deserialize(&buf[buf.len() - SIZE_OF_NONCE..]).ok()
}
}

#[cfg(test)]
mod test {
use super::*;
use solana_ledger::{
shred::{Shred, Shredder},
sigverify_shreds::verify_shred_cpu,
};
use solana_sdk::signature::{Keypair, Signer};
use std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr},
};

fn run_test_sigverify_shred_cpu_repair(slot: Slot) {
solana_logger::setup();
let mut shred = Shred::new_from_data(
slot,
0xc0de,
0xdead,
Some(&[1, 2, 3, 4]),
true,
true,
0,
0,
0xc0de,
);
assert_eq!(shred.slot(), slot);
let keypair = Keypair::new();
Shredder::sign_shred(&keypair, &mut shred);
trace!("signature {}", shred.common_header.signature);
let nonce = 9;
let mut packet = repair_response_packet_from_shred(
shred.payload,
&SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
nonce,
)
.unwrap();
packet.meta.repair = true;

let leader_slots = [(slot, keypair.pubkey().to_bytes())]
.iter()
.cloned()
.collect();
let rv = verify_shred_cpu(&packet, &leader_slots);
assert_eq!(rv, Some(1));

let wrong_keypair = Keypair::new();
let leader_slots = [(slot, wrong_keypair.pubkey().to_bytes())]
.iter()
.cloned()
.collect();
let rv = verify_shred_cpu(&packet, &leader_slots);
assert_eq!(rv, Some(0));

let leader_slots = HashMap::new();
let rv = verify_shred_cpu(&packet, &leader_slots);
assert_eq!(rv, None);
}

#[test]
fn test_sigverify_shred_cpu_repair() {
run_test_sigverify_shred_cpu_repair(0xdead_c0de);
}
}
59 changes: 27 additions & 32 deletions core/src/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use crate::{
cluster_slots::ClusterSlots,
consensus::VOTE_THRESHOLD_SIZE,
result::Result,
serve_repair::{RepairType, ServeRepair},
serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE},
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use solana_ledger::{
bank_forks::BankForks,
blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta},
shred::Nonce,
};
use solana_runtime::bank::Bank;
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp};
Expand Down Expand Up @@ -104,7 +105,7 @@ impl RepairService {
&blockstore,
&exit,
&repair_socket,
&cluster_info,
cluster_info,
repair_info,
&cluster_slots,
)
Expand All @@ -118,19 +119,19 @@ impl RepairService {
blockstore: &Blockstore,
exit: &AtomicBool,
repair_socket: &UdpSocket,
cluster_info: &Arc<ClusterInfo>,
cluster_info: Arc<ClusterInfo>,
repair_info: RepairInfo,
cluster_slots: &Arc<ClusterSlots>,
cluster_slots: &ClusterSlots,
) {
let serve_repair = ServeRepair::new(cluster_info.clone());
let id = cluster_info.id();
Self::initialize_lowest_slot(id, blockstore, cluster_info);
Self::initialize_lowest_slot(id, blockstore, &cluster_info);
let mut repair_stats = RepairStats::default();
let mut last_stats = Instant::now();
let mut duplicate_slot_repair_statuses = HashMap::new();
Self::initialize_epoch_slots(
blockstore,
cluster_info,
&cluster_info,
&repair_info.completed_slots_receiver,
);
loop {
Expand All @@ -144,7 +145,7 @@ impl RepairService {
let lowest_slot = blockstore.lowest_slot();
Self::update_lowest_slot(&id, lowest_slot, &cluster_info);
Self::update_completed_slots(&repair_info.completed_slots_receiver, &cluster_info);
cluster_slots.update(new_root, cluster_info, &repair_info.bank_forks);
cluster_slots.update(new_root, &cluster_info, &repair_info.bank_forks);
let new_duplicate_slots = Self::find_new_duplicate_slots(
&duplicate_slot_repair_statuses,
blockstore,
Expand Down Expand Up @@ -178,27 +179,19 @@ impl RepairService {

if let Ok(repairs) = repairs {
let mut cache = HashMap::new();
let reqs: Vec<((SocketAddr, Vec<u8>), RepairType)> = repairs
.into_iter()
.filter_map(|repair_request| {
serve_repair
.repair_request(
&cluster_slots,
&repair_request,
&mut cache,
&mut repair_stats,
)
.map(|result| (result, repair_request))
.ok()
})
.collect();

for ((to, req), _) in reqs {
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
info!("{} repair req send_to({}) error {:?}", id, to, e);
0
});
}
repairs.into_iter().for_each(|repair_request| {
if let Ok((to, req)) = serve_repair.repair_request(
&cluster_slots,
repair_request,
&mut cache,
&mut repair_stats,
) {
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
info!("{} repair req send_to({}) error {:?}", id, to, e);
0
});
}
});
}

if last_stats.elapsed().as_secs() > 1 {
Expand Down Expand Up @@ -326,6 +319,7 @@ impl RepairService {
&repair_addr,
serve_repair,
repair_stats,
DEFAULT_NONCE,
) {
info!("repair req send_to({}) error {:?}", repair_addr, e);
}
Expand All @@ -346,8 +340,9 @@ impl RepairService {
to: &SocketAddr,
serve_repair: &ServeRepair,
repair_stats: &mut RepairStats,
nonce: Nonce,
) -> Result<()> {
let req = serve_repair.map_repair_request(&repair_type, repair_stats)?;
let req = serve_repair.map_repair_request(&repair_type, repair_stats, nonce)?;
repair_socket.send_to(&req, to)?;
Ok(())
}
Expand Down Expand Up @@ -740,7 +735,7 @@ mod test {
let blockstore = Blockstore::open(&blockstore_path).unwrap();

let slots: Vec<u64> = vec![1, 3, 5, 7, 8];
let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1;
let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1;

let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot);
for (mut slot_shreds, _) in shreds.into_iter() {
Expand Down Expand Up @@ -850,7 +845,7 @@ mod test {
);

// Insert some shreds to create a SlotMeta, should make repairs
let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1;
let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1;
let (mut shreds, _) = make_slot_entries(dead_slot, dead_slot - 1, num_entries_per_slot);
blockstore
.insert_shreds(shreds[..shreds.len() - 1].to_vec(), None, false)
Expand Down Expand Up @@ -883,7 +878,7 @@ mod test {
};

// Insert some shreds to create a SlotMeta,
let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1;
let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1;
let (mut shreds, _) = make_slot_entries(dead_slot, dead_slot - 1, num_entries_per_slot);
blockstore
.insert_shreds(shreds[..shreds.len() - 1].to_vec(), None, false)
Expand Down
Loading