Skip to content

Commit b7b5b40

Browse files
committed
impl: merger process
1 parent 84a8348 commit b7b5b40

File tree

4 files changed

+126
-6
lines changed

4 files changed

+126
-6
lines changed

processes/src/merger/mod.rs

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
mod process;
2+
mod processor;
3+
4+
pub use process::process;
5+
pub use processor::Processor;

processes/src/merger/process.rs

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
use std::time::Duration;
2+
3+
use super::Processor;
4+
use crate::transactions;
5+
use tokio::{
6+
process::{ChildStdin, ChildStdout},
7+
sync::mpsc,
8+
time::sleep,
9+
};
10+
11+
pub fn process(path: &str, transactions: transactions::Processor) -> Processor {
12+
let (sender, mut receiver) = mpsc::channel::<u32>(1000);
13+
14+
let (mut stdin, mut stdout) = nacho_js_process::spawn(&[path]).unwrap();
15+
16+
tokio::spawn(async move {
17+
let sleep = sleep(Duration::from_millis(100));
18+
tokio::pin!(sleep);
19+
loop {
20+
tokio::select! {
21+
_ = &mut sleep => {
22+
let maybe_merged_until = transactions.get_merged_until().await;
23+
let maybe_proved_until = transactions.get_proved_until().await;
24+
25+
if let Some((merged_until, proved_until)) = maybe_merged_until.zip(maybe_proved_until) {
26+
if proved_until <= 1 {
27+
continue;
28+
}
29+
30+
if merged_until == 0 {
31+
if start_merge(&mut stdin, &mut stdout, 0).await.is_ok() {
32+
transactions.set_merged_until(2).await;
33+
}
34+
continue;
35+
}
36+
37+
if proved_until > merged_until {
38+
if continue_merge(&mut stdin, &mut stdout, merged_until as u32).await.is_ok() {
39+
transactions.set_merged_until(merged_until + 1).await;
40+
}
41+
continue;
42+
}
43+
}
44+
45+
}
46+
msg = receiver.recv() => {
47+
if let Some(at) = msg {
48+
if start_merge(&mut stdin, &mut stdout, at).await.is_ok() {
49+
transactions.set_merged_until(at as u64 + 2).await;
50+
}
51+
}
52+
},
53+
54+
}
55+
}
56+
});
57+
58+
Processor {
59+
sender: Box::leak(Box::new(sender)),
60+
}
61+
}
62+
63+
pub async fn start_merge(
64+
stdin: &mut ChildStdin,
65+
stdout: &mut ChildStdout,
66+
at: u32,
67+
) -> Result<(), ()> {
68+
let mut input = [0u8; 5];
69+
let mut output = [0u8; 1];
70+
71+
input[0] = 0;
72+
input[1..5].copy_from_slice(&at.to_le_bytes());
73+
74+
nacho_js_process::interact(stdin, stdout, &input, &mut output)
75+
.await
76+
.map_err(|_| ())?;
77+
78+
let is_success = output[0] != 0;
79+
80+
if is_success {
81+
Ok(())
82+
} else {
83+
Err(())
84+
}
85+
}
86+
87+
pub async fn continue_merge(
88+
stdin: &mut ChildStdin,
89+
stdout: &mut ChildStdout,
90+
at: u32,
91+
) -> Result<(), ()> {
92+
let mut input = [0u8; 5];
93+
let mut output = [0u8; 1];
94+
95+
input[0] = 1;
96+
input[1..5].copy_from_slice(&at.to_le_bytes());
97+
98+
nacho_js_process::interact(stdin, stdout, &input, &mut output)
99+
.await
100+
.map_err(|_| ())?;
101+
102+
let is_success = output[0] != 0;
103+
104+
if is_success {
105+
Ok(())
106+
} else {
107+
Err(())
108+
}
109+
}

processes/src/merger/processor.rs

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
use tokio::sync::mpsc;
2+
3+
#[derive(Clone, Copy, Debug)]
4+
pub struct Processor {
5+
pub(crate) sender: &'static mpsc::Sender<u32>,
6+
}
7+
8+
impl Processor {
9+
pub async fn start_merge(&self, at: u32) {
10+
self.sender.send(at).await.unwrap()
11+
}
12+
}

proof-merger-process/src/types.ts

-6
This file was deleted.

0 commit comments

Comments
 (0)