Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

Commit

Permalink
feat: change wal
Browse files Browse the repository at this point in the history
  • Loading branch information
LycrusHamster committed Sep 2, 2020
1 parent 16e9f6e commit 4e70b2c
Show file tree
Hide file tree
Showing 13 changed files with 201 additions and 138 deletions.
2 changes: 1 addition & 1 deletion core/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "0.2", features = ["macros", "sync", "rt-core", "rt-threaded"] }
bytes = { version = "0.5", features = ["serde"] }
lazy_static = "1.4"

common-apm = { path = "../../common/apm" }
common-crypto = { path = "../../common/crypto" }
Expand All @@ -37,7 +38,6 @@ protocol = { path = "../../protocol", package = "muta-protocol" }

[dev-dependencies]
bit-vec = "0.6"
lazy_static = "1.4"
num-traits = "0.2"
rand = "0.7"

Expand Down
4 changes: 3 additions & 1 deletion core/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::engine::ConsensusEngine;
use crate::fixed_types::FixedPill;
use crate::status::StatusAgent;
use crate::util::OverlordCrypto;
use crate::wal::SignedTxsWAL;
use crate::wal::{ConsensusWal, SignedTxsWAL};
use crate::{ConsensusError, ConsensusType};

/// Provide consensus
Expand Down Expand Up @@ -109,6 +109,7 @@ impl<Adapter: ConsensusAdapter + 'static> OverlordConsensus<Adapter> {
txs_wal: Arc<SignedTxsWAL>,
adapter: Arc<Adapter>,
lock: Arc<Mutex<()>>,
consensus_wal: Arc<ConsensusWal>,
) -> Self {
let engine = Arc::new(ConsensusEngine::new(
status_agent.clone(),
Expand All @@ -117,6 +118,7 @@ impl<Adapter: ConsensusAdapter + 'static> OverlordConsensus<Adapter> {
Arc::clone(&adapter),
Arc::clone(&crypto),
lock,
consensus_wal,
));

let overlord = Overlord::new(node_info.self_pub_key, Arc::clone(&engine), crypto, engine);
Expand Down
12 changes: 7 additions & 5 deletions core/consensus/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::message::{
};
use crate::status::StatusAgent;
use crate::util::{check_list_roots, digest_signed_transactions, time_now, OverlordCrypto};
use crate::wal::SignedTxsWAL;
use crate::wal::{ConsensusWal, SignedTxsWAL};
use crate::ConsensusError;

const RETRY_COMMIT_INTERVAL: u64 = 1000; // 1s
Expand All @@ -53,6 +53,7 @@ pub struct ConsensusEngine<Adapter> {
lock: Arc<Mutex<()>>,

last_commit_time: RwLock<u64>,
consensus_wal: Arc<ConsensusWal>,
last_check_block_fail_reason: RwLock<String>,
}

Expand Down Expand Up @@ -512,15 +513,14 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
#[async_trait]
impl<Adapter: ConsensusAdapter + 'static> Wal for ConsensusEngine<Adapter> {
async fn save(&self, info: Bytes) -> Result<(), Box<dyn Error + Send>> {
self.adapter
.save_overlord_wal(Context::new(), info)
.await
self.consensus_wal
.update_overlord_wal(info)
.map_err(|e| ProtocolError::from(ConsensusError::Other(e.to_string())))?;
Ok(())
}

async fn load(&self) -> Result<Option<Bytes>, Box<dyn Error + Send>> {
let res = self.adapter.load_overlord_wal(Context::new()).await.ok();
let res = self.consensus_wal.load_overlord_wal().ok();
Ok(res)
}
}
Expand All @@ -533,6 +533,7 @@ impl<Adapter: ConsensusAdapter + 'static> ConsensusEngine<Adapter> {
adapter: Arc<Adapter>,
crypto: Arc<OverlordCrypto>,
lock: Arc<Mutex<()>>,
consensus_wal: Arc<ConsensusWal>,
) -> Self {
Self {
status_agent,
Expand All @@ -543,6 +544,7 @@ impl<Adapter: ConsensusAdapter + 'static> ConsensusEngine<Adapter> {
crypto,
lock,
last_commit_time: RwLock::new(time_now()),
consensus_wal,
last_check_block_fail_reason: RwLock::new(String::new()),
}
}
Expand Down
12 changes: 10 additions & 2 deletions core/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ pub enum ConsensusType {
pub enum ConsensusError {
/// Check block error.
#[display(fmt = "Check invalid prev_hash, expect {:?} get {:?}", expect, actual)]
InvalidPrevhash { expect: Hash, actual: Hash },
InvalidPrevhash {
expect: Hash,
actual: Hash,
},

#[display(fmt = "Check invalid order root, expect {:?} get {:?}", expect, actual)]
InvalidOrderRoot {
Expand All @@ -81,7 +84,10 @@ pub enum ConsensusError {
expect,
actual
)]
InvalidOrderSignedTransactionsHash { expect: Hash, actual: Hash },
InvalidOrderSignedTransactionsHash {
expect: Hash,
actual: Hash,
},

#[display(fmt = "Check invalid status vec")]
InvalidStatusVec,
Expand Down Expand Up @@ -153,6 +159,8 @@ pub enum ConsensusError {
/// Other error used for very few errors.
#[display(fmt = "{:?}", _0)]
Other(String),

CheckSumMismatch,
}

#[derive(Debug, Display)]
Expand Down
6 changes: 4 additions & 2 deletions core/consensus/src/tests/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ use crate::engine::ConsensusEngine;
use crate::fixed_types::FixedPill;
use crate::status::StatusAgent;
use crate::util::OverlordCrypto;
use crate::wal::SignedTxsWAL;
use crate::wal::{ConsensusWal, SignedTxsWAL};

use super::*;

static FULL_TXS_PATH: &str = "./free-space/engine";
static FULL_TXS_PATH: &str = "./free-space/engine/txs";
static FULL_CONSENSUS_PATH: &str = "./free-space/engine/consensus";

#[tokio::test]
async fn test_repetitive_commit() {
Expand Down Expand Up @@ -79,6 +80,7 @@ fn init_engine(init_status: CurrentConsensusStatus) -> ConsensusEngine<MockConse
Arc::new(MockConsensusAdapter {}),
Arc::new(init_crypto()),
Arc::new(Mutex::new(())),
Arc::new(ConsensusWal::new(FULL_CONSENSUS_PATH)),
)
}

Expand Down
106 changes: 106 additions & 0 deletions core/consensus/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@ use std::fs;
use std::io::{ErrorKind, Read, Write};
use std::path::{Path, PathBuf};

use lazy_static::lazy_static;

use protocol::codec::ProtocolCodecSync;
use protocol::types::{Bytes, Hash, SignedTransaction};
use protocol::ProtocolResult;

use crate::fixed_types::FixedSignedTxs;
use crate::ConsensusError;
use bytes::{BufMut, BytesMut};

lazy_static! {
static ref OVERLORD_WAL: String = "overlord_wal".to_string();
}

#[derive(Debug)]
pub struct SignedTxsWAL {
Expand Down Expand Up @@ -129,6 +136,87 @@ impl SignedTxsWAL {
}
}

#[derive(Debug)]
pub struct ConsensusWal {
path: PathBuf,
}

impl ConsensusWal {
pub fn new<P: AsRef<Path>>(path: P) -> Self {
if !path.as_ref().exists() {
fs::create_dir_all(&path).expect("Failed to create wal directory");
}

ConsensusWal {
path: path.as_ref().to_path_buf(),
}
}

//#[muta_apm::derive::tracing_span(kind = "XXX")]
pub fn update_overlord_wal(&self, info: Bytes) -> ProtocolResult<()> {
let mut wal_path = self.path.clone();
if !wal_path.exists() {
fs::create_dir(&wal_path).map_err(ConsensusError::WALErr)?;
}

let check_sum = Hash::digest(info.clone());

wal_path.push(OVERLORD_WAL.clone());
wal_path.set_extension("txt");

let mut wal_file = match fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(wal_path)
{
Ok(file) => file,
Err(err) => {
if err.kind() == ErrorKind::AlreadyExists {
return Ok(());
} else {
return Err(ConsensusError::WALErr(err).into());
}
}
};

let mut data = BytesMut::new();

data.put(check_sum.as_bytes());
data.put(info);

wal_file
.write_all(data.as_ref())
.map_err(ConsensusError::WALErr)?;
Ok(())
}

//#[muta_apm::derive::tracing_span(kind = "XXX")]
pub fn load_overlord_wal(&self) -> ProtocolResult<Bytes> {
let mut file_path = self.path.clone();
file_path.push(OVERLORD_WAL.clone());
file_path.set_extension("txt");

let mut read_buf = Vec::new();
let mut file = fs::File::open(&file_path).map_err(ConsensusError::WALErr)?;
let _ = file
.read_to_end(&mut read_buf)
.map_err(ConsensusError::WALErr)?;

let mut data = Bytes::from(read_buf);

let info = data.split_off(Hash::from_empty().as_bytes().len());

let check_sum = Hash::digest(info.clone());

if data.eq(&check_sum.as_bytes()) {
Ok(info)
} else {
Err(ConsensusError::CheckSumMismatch.into())
}
}
}

#[rustfmt::skip]
/// Bench in Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz (8 x 2200):
/// test wal::test::bench_save_wal_1000_txs ... bench: 2,346,611 ns/iter (+/- 754,074)
Expand All @@ -154,6 +242,8 @@ mod tests {

static FULL_TXS_PATH: &str = "./free-space/wal";

static FULL_CONSENSUS_PATH: &str = "./free-space";

fn mock_hash() -> Hash {
Hash::digest(get_random_bytes(10))
}
Expand Down Expand Up @@ -233,6 +323,22 @@ mod tests {
wal.remove(3u64).unwrap();
}

#[test]
fn test_consensus_wal() {
let wal = ConsensusWal::new(FULL_CONSENSUS_PATH.to_string());
// let info = get_random_bytes(1000);

/// let s = String::from("hello");
/// let bytes = s.into_bytes();
///
/// assert_eq!(&[104, 101, 108, 108, 111][..], &bytes[..]);
let info = Bytes::from("hellow");
wal.update_overlord_wal(info.clone()).unwrap();

let load = wal.load_overlord_wal().unwrap();
assert_eq!(load,info);
}

#[test]
fn test_wal_txs_codec() {
for _ in 0..10 {
Expand Down
49 changes: 28 additions & 21 deletions core/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,28 @@ impl<Adapter: StorageAdapter> Storage for ImplStorage<Adapter> {
self.adapter
.insert::<BlockSchema>(BlockKey::new(block.header.height), block.clone())
.await?;

self.set_latest_block(ctx, block).await?;

Ok(())
}

async fn get_block(&self, _ctx: Context, height: u64) -> ProtocolResult<Option<Block>> {
self.adapter.get::<BlockSchema>(BlockKey::new(height)).await
}

async fn get_latest_block(&self, _ctx: Context) -> ProtocolResult<Block> {
let opt_block = { self.latest_block.read().await.clone() };

if let Some(block) = opt_block {
Ok(block)
} else {
let block = ensure_get!(self, LATEST_BLOCK_KEY.clone(), LatestBlockSchema);
Ok(block)
}
}

async fn set_latest_block(&self, _ctx: Context, block: Block) -> ProtocolResult<()> {
self.adapter
.insert::<LatestBlockSchema>(LATEST_BLOCK_KEY.clone(), block.clone())
.await?;
Expand All @@ -385,8 +407,12 @@ impl<Adapter: StorageAdapter> Storage for ImplStorage<Adapter> {
Ok(())
}

async fn get_block(&self, _ctx: Context, height: u64) -> ProtocolResult<Option<Block>> {
self.adapter.get::<BlockSchema>(BlockKey::new(height)).await
#[muta_apm::derive::tracing_span(kind = "storage")]
async fn update_overlord_wal(&self, ctx: Context, info: Bytes) -> ProtocolResult<()> {
self.adapter
.insert::<OverlordWalSchema>(OVERLORD_WAL_KEY.clone(), info)
.await?;
Ok(())
}

#[muta_apm::derive::tracing_span(kind = "storage")]
Expand Down Expand Up @@ -505,25 +531,6 @@ impl<Adapter: StorageAdapter> Storage for ImplStorage<Adapter> {
Ok(proof)
}

async fn get_latest_block(&self, _ctx: Context) -> ProtocolResult<Block> {
let opt_block = { self.latest_block.read().await.clone() };

if let Some(block) = opt_block {
Ok(block)
} else {
let block = ensure_get!(self, LATEST_BLOCK_KEY.clone(), LatestBlockSchema);
Ok(block)
}
}

#[muta_apm::derive::tracing_span(kind = "storage")]
async fn update_overlord_wal(&self, ctx: Context, info: Bytes) -> ProtocolResult<()> {
self.adapter
.insert::<OverlordWalSchema>(OVERLORD_WAL_KEY.clone(), info)
.await?;
Ok(())
}

#[muta_apm::derive::tracing_span(kind = "storage")]
async fn load_overlord_wal(&self, ctx: Context) -> ProtocolResult<Bytes> {
let wal_info = ensure_get!(self, OVERLORD_WAL_KEY.clone(), OverlordWalSchema);
Expand Down
Loading

0 comments on commit 4e70b2c

Please sign in to comment.