Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

fix(WAL): Ignore path already exist #304

Merged
merged 3 commits into from
Jun 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ futures = { version = "0.3", features = ["async-await"] }
futures-timer = "3.0"
hex = "0.4"
log = "0.4"
overlord = { git = "https://github.com/nervosnetwork/overlord", rev = "c1f46fb" }
overlord = { git = "https://github.com/nervosnetwork/overlord", rev = "f9a5dbe6" }
parking_lot = "0.10"
prost = "0.6"
rlp = "0.4"
Expand Down
28 changes: 14 additions & 14 deletions core/consensus/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use async_trait::async_trait;
use futures::lock::Mutex;
use futures_timer::Delay;
use log::error;
use log::{error, info, warn};
use overlord::error::ConsensusError as OverlordError;
use overlord::types::{Commit, Node, OverlordMsg, Status};
use overlord::{Consensus as Engine, DurationConfig, Wal};
Expand Down Expand Up @@ -71,7 +71,7 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
if current_consensus_status.latest_committed_height
!= current_consensus_status.current_proof.height
{
log::error!("[consensus] get_block for {}, error, current_consensus_status.current_height {} != current_consensus_status.current_proof.height, proof :{:?}",
error!("[consensus] get_block for {}, error, current_consensus_status.current_height {} != current_consensus_status.current_proof.height, proof :{:?}",
current_consensus_status.latest_committed_height,
current_consensus_status.current_proof.height,
current_consensus_status.current_proof)
Expand Down Expand Up @@ -123,7 +123,7 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
};

if header.height != header.proof.height + 1 {
log::error!(
error!(
"[consensus] get_block for {}, proof error, proof height mismatch, block : {:?}",
header.height,
header.clone(),
Expand Down Expand Up @@ -165,7 +165,7 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
let time = Instant::now();

if block.inner.block.header.height != block.inner.block.header.proof.height + 1 {
log::error!("[consensus-engine]: check_block for overlord receives a proposal, error, block height {}, block {:?}", block.inner.block.header.height,block.inner.block);
error!("[consensus-engine]: check_block for overlord receives a proposal, error, block height {}, block {:?}", block.inner.block.header.height,block.inner.block);
}

let order_hashes = block.get_ordered_hashes();
Expand All @@ -180,7 +180,7 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
.verify_block_header(ctx.clone(), block.inner.block.clone())
.await
.map_err(|e| {
log::error!(
error!(
"[consensus] check_block, verify_block_header error, block header: {:?}",
block.inner.block.header
);
Expand All @@ -203,7 +203,7 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
)
.await
.map_err(|e| {
log::error!(
error!(
"[consensus] check_block, verify_proof error, previous block header: {:?}, proof: {:?}",
previous_block.header,
block.inner.block.header.proof
Expand All @@ -219,7 +219,7 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
)
.await
.map_err(|e| {
log::error!("[consensus] check_block, verify_txs error",);
error!("[consensus] check_block, verify_txs error",);
e
})?;

Expand Down Expand Up @@ -255,22 +255,22 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
});
}

log::info!(
info!(
"[consensus-engine]: check block cost {:?}",
Instant::now() - time
);
let time = Instant::now();
let txs = self.adapter.get_full_txs(ctx, order_hashes).await?;

log::info!(
info!(
"[consensus-engine]: get txs cost {:?}",
Instant::now() - time
);
let time = Instant::now();
self.txs_wal
.save(next_height, Hash::from_bytes(hash)?, txs)?;

log::info!(
info!(
"[consensus-engine]: write wal cost {:?} order_hashes_len {:?}",
time.elapsed(),
order_hashes_len
Expand Down Expand Up @@ -373,7 +373,7 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
pill.block.header.height,
pill.block.header.timestamp,
)?;
log::info!(
info!(
"[consensus]: validator of height {} is {:?}",
current_height + 1,
metadata.verifier_list
Expand Down Expand Up @@ -623,7 +623,7 @@ impl<Adapter: ConsensusAdapter + 'static> ConsensusEngine<Adapter> {
if status.latest_committed_state_root != block.state_root
&& !status.list_state_root.contains(&block.state_root)
{
error!(
warn!(
"invalid status list_state_root, latest {:?}, current list {:?}, block {:?}",
status.latest_committed_state_root, status.list_state_root, block.state_root
);
Expand Down Expand Up @@ -745,7 +745,7 @@ impl<Adapter: ConsensusAdapter + 'static> ConsensusEngine<Adapter> {
let block_hash = Hash::digest(block.header.encode_fixed()?);

if block.header.height != proof.height {
log::info!("[consensus] update_status for handle_commit, error, before update, block height {}, proof height:{}, proof : {:?}",
info!("[consensus] update_status for handle_commit, error, before update, block height {}, proof height:{}, proof : {:?}",
block.header.height,
proof.height,
proof.clone());
Expand All @@ -759,7 +759,7 @@ impl<Adapter: ConsensusAdapter + 'static> ConsensusEngine<Adapter> {
if committed_status_agent.latest_committed_height
!= committed_status_agent.current_proof.height
{
log::error!("[consensus] update_status for handle_commit, error, current_height {} != current_proof.height {}, proof :{:?}",
error!("[consensus] update_status for handle_commit, error, current_height {} != current_proof.height {}, proof :{:?}",
committed_status_agent.latest_committed_height,
committed_status_agent.current_proof.height,
committed_status_agent.current_proof)
Expand Down
33 changes: 19 additions & 14 deletions core/consensus/src/wal.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::fs;
use std::io::{Read, Write};
use std::io::{ErrorKind, Read, Write};
use std::path::{Path, PathBuf};

use protocol::codec::ProtocolCodecSync;
Expand Down Expand Up @@ -31,25 +31,30 @@ impl SignedTxsWAL {
block_hash: Hash,
txs: Vec<SignedTransaction>,
) -> ProtocolResult<()> {
let mut dir = self.path.clone();
dir.push(height.to_string());
if !Path::new(&dir).exists() {
fs::create_dir(&dir).map_err(ConsensusError::WALErr)?;
let mut wal_path = self.path.clone();
wal_path.push(height.to_string());
if !wal_path.exists() {
fs::create_dir(&wal_path).map_err(ConsensusError::WALErr)?;
}

dir.push(block_hash.as_hex());
dir.set_extension("txt");
wal_path.push(block_hash.as_hex());
wal_path.set_extension("txt");

if dir.exists() {
return Ok(());
}

let mut wal_file = fs::OpenOptions::new()
let mut wal_file = match fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(dir)
.map_err(ConsensusError::WALErr)?;
.open(wal_path)
{
Ok(file) => file,
Err(err) => {
if err.kind() == ErrorKind::AlreadyExists {
return Ok(());
} else {
return Err(ConsensusError::WALErr(err).into());
}
}
};

let data = FixedSignedTxs::new(txs).encode_sync()?;
wal_file
Expand Down