Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Generalize the event log #23

Merged
merged 6 commits into from
Feb 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ serde = "1.0.27"
serde_derive = "1.0.27"
ring = "0.12.1"
untrusted = "0.5.1"
bincode = "1.0.0"
4 changes: 2 additions & 2 deletions src/bin/demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::thread::sleep;
use std::time::Duration;
use std::sync::mpsc::SendError;

fn create_log(hist: &Historian) -> Result<(), SendError<Event>> {
fn create_log(hist: &Historian<Sha256Hash>) -> Result<(), SendError<Event<Sha256Hash>>> {
sleep(Duration::from_millis(15));
let data = Sha256Hash::default();
hist.sender.send(Event::Discovery { data })?;
Expand All @@ -19,7 +19,7 @@ fn main() {
let hist = Historian::new(&seed, Some(10));
create_log(&hist).expect("send error");
drop(hist.sender);
let entries: Vec<Entry> = hist.receiver.iter().collect();
let entries: Vec<Entry<Sha256Hash>> = hist.receiver.iter().collect();
for entry in &entries {
println!("{:?}", entry);
}
Expand Down
39 changes: 20 additions & 19 deletions src/historian.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,25 @@ use std::thread::JoinHandle;
use std::sync::mpsc::{Receiver, Sender};
use std::time::{Duration, SystemTime};
use log::{hash, hash_event, Entry, Event, Sha256Hash};
use serde::Serialize;

pub struct Historian {
pub sender: Sender<Event>,
pub receiver: Receiver<Entry>,
pub thread_hdl: JoinHandle<(Entry, ExitReason)>,
pub struct Historian<T> {
pub sender: Sender<Event<T>>,
pub receiver: Receiver<Entry<T>>,
pub thread_hdl: JoinHandle<(Entry<T>, ExitReason)>,
}

#[derive(Debug, PartialEq, Eq)]
pub enum ExitReason {
RecvDisconnected,
SendDisconnected,
}
fn log_event(
sender: &Sender<Entry>,
fn log_event<T: Serialize + Clone>(
sender: &Sender<Entry<T>>,
num_hashes: &mut u64,
end_hash: &mut Sha256Hash,
event: Event,
) -> Result<(), (Entry, ExitReason)> {
event: Event<T>,
) -> Result<(), (Entry<T>, ExitReason)> {
*end_hash = hash_event(end_hash, &event);
let entry = Entry {
end_hash: *end_hash,
Expand All @@ -40,15 +41,15 @@ fn log_event(
Ok(())
}

fn log_events(
receiver: &Receiver<Event>,
sender: &Sender<Entry>,
fn log_events<T: Serialize + Clone>(
receiver: &Receiver<Event<T>>,
sender: &Sender<Entry<T>>,
num_hashes: &mut u64,
end_hash: &mut Sha256Hash,
epoch: SystemTime,
num_ticks: &mut u64,
ms_per_tick: Option<u64>,
) -> Result<(), (Entry, ExitReason)> {
) -> Result<(), (Entry<T>, ExitReason)> {
use std::sync::mpsc::TryRecvError;
loop {
if let Some(ms) = ms_per_tick {
Expand Down Expand Up @@ -79,12 +80,12 @@ fn log_events(

/// A background thread that will continue tagging received Event messages and
/// sending back Entry messages until either the receiver or sender channel is closed.
pub fn create_logger(
pub fn create_logger<T: 'static + Serialize + Clone + Send>(
start_hash: Sha256Hash,
ms_per_tick: Option<u64>,
receiver: Receiver<Event>,
sender: Sender<Entry>,
) -> JoinHandle<(Entry, ExitReason)> {
receiver: Receiver<Event<T>>,
sender: Sender<Entry<T>>,
) -> JoinHandle<(Entry<T>, ExitReason)> {
use std::thread;
thread::spawn(move || {
let mut end_hash = start_hash;
Expand All @@ -109,7 +110,7 @@ pub fn create_logger(
})
}

impl Historian {
impl<T: 'static + Serialize + Clone + Send> Historian<T> {
pub fn new(start_hash: &Sha256Hash, ms_per_tick: Option<u64>) -> Self {
use std::sync::mpsc::channel;
let (sender, event_receiver) = channel();
Expand Down Expand Up @@ -157,7 +158,7 @@ mod tests {
#[test]
fn test_historian_closed_sender() {
let zero = Sha256Hash::default();
let hist = Historian::new(&zero, None);
let hist = Historian::<u8>::new(&zero, None);
drop(hist.receiver);
hist.sender.send(Event::Tick).unwrap();
assert_eq!(
Expand All @@ -179,7 +180,7 @@ mod tests {
ExitReason::RecvDisconnected
);

let entries: Vec<Entry> = hist.receiver.iter().collect();
let entries: Vec<Entry<Sha256Hash>> = hist.receiver.iter().collect();
assert!(entries.len() > 1);
assert!(verify_slice(&entries, &zero));
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![cfg_attr(feature = "unstable", feature(test))]
pub mod log;
pub mod historian;
extern crate bincode;
extern crate generic_array;
extern crate rayon;
extern crate ring;
Expand Down
Loading