Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improve error handling, and more! #524

Merged
merged 25 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f1a6b4d
spelling fixes
MicaiahReid Mar 12, 2024
8863c67
fix overflow subraction
MicaiahReid Mar 12, 2024
b7e2a02
fix: continue on failed predicate registration
MicaiahReid Mar 12, 2024
af42d42
fix: terminate service on ProcessBitcoinBlock failure
MicaiahReid Mar 12, 2024
23165ae
spelling fix
MicaiahReid Mar 12, 2024
dfa63ae
remove panic on missing btc block headers
MicaiahReid Mar 12, 2024
07bdc0d
log crit error on ObserverEvent loop failure
MicaiahReid Mar 12, 2024
972979a
fix: crash service if scan loops crash
MicaiahReid Mar 12, 2024
94a98b4
some cleanup
MicaiahReid Mar 12, 2024
b0313a4
fix: shutdown predicate registration api on Terminate event
MicaiahReid Mar 12, 2024
ed04d76
fix: terminate event observer on failure
MicaiahReid Mar 13, 2024
2e6be80
fix: predicate deregistration deletes interrupted predicates
MicaiahReid Mar 13, 2024
86e5910
remove some unwraps
MicaiahReid Mar 13, 2024
dbe48ae
fix some log levels
MicaiahReid Mar 13, 2024
0ae70da
fix: update log levels across chainhook service
MicaiahReid Mar 14, 2024
f039232
fix tests
MicaiahReid Mar 14, 2024
4559171
add todo
MicaiahReid Mar 15, 2024
c5a474a
fix: remove unrwap/panic from config/archive
MicaiahReid Mar 15, 2024
1464726
remove unwrap from stacks csv parsing
MicaiahReid Mar 15, 2024
617bbdd
remove unwrap/expect from database updates
MicaiahReid Mar 15, 2024
133868a
warn on unable to infer progress
MicaiahReid Mar 15, 2024
8c0aed0
chore: warn on all reorgs
MicaiahReid Mar 18, 2024
3fa934e
fix: only update tsv if user configuration says to
MicaiahReid Mar 20, 2024
f95dd9c
chore: set release feature log level
MicaiahReid Mar 20, 2024
4c4f944
fix tsv assertion formatting
MicaiahReid Mar 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions components/chainhook-cli/src/archive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
println!("{}", e.to_string());
});

let remote_sha_url = config.expected_remote_stacks_tsv_sha256();
let remote_sha_url = config.expected_remote_stacks_tsv_sha256()?;
let res = reqwest::get(&remote_sha_url)
.await
.or(Err(format!("Failed to GET from '{}'", &remote_sha_url)))?
Expand All @@ -34,7 +34,7 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> {

write_file_content_at_path(&local_sha_file_path, &res.to_vec())?;

let file_url = config.expected_remote_stacks_tsv_url();
let file_url = config.expected_remote_stacks_tsv_url()?;
let res = reqwest::get(&file_url)
.await
.or(Err(format!("Failed to GET from '{}'", &file_url)))?;
Expand All @@ -55,14 +55,17 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
Ok(0) => break,
Ok(n) => {
if let Err(e) = file.write_all(&buffer[..n]) {
let err =
format!("unable to update compressed archive: {}", e.to_string());
return Err(err);
return Err(format!(
"unable to update compressed archive: {}",
e.to_string()
));
}
}
Err(e) => {
let err = format!("unable to write compressed archive: {}", e.to_string());
return Err(err);
return Err(format!(
"unable to write compressed archive: {}",
e.to_string()
));
}
}
}
Expand All @@ -83,12 +86,11 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
.map_err(|e| format!("unable to download stacks archive: {}", e.to_string()))?;
}
drop(tx);

tokio::task::spawn_blocking(|| decoder_thread.join())
.await
.unwrap()
.unwrap()
.unwrap();
.map_err(|e| format!("failed to spawn thread: {e}"))?
.map_err(|e| format!("decoder thread failed when downloading tsv: {:?}", e))?
.map_err(|e| format!("failed to download tsv: {}", e))?;
}

Ok(())
Expand Down Expand Up @@ -124,11 +126,14 @@ impl Read for ChannelRead {
}
}

pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Context) -> bool {
pub async fn download_stacks_dataset_if_required(
config: &mut Config,
ctx: &Context,
) -> Result<bool, String> {
if config.is_initial_ingestion_required() {
// Download default tsv.
if config.rely_on_remote_stacks_tsv() && config.should_download_remote_stacks_tsv() {
let url = config.expected_remote_stacks_tsv_url();
let url = config.expected_remote_stacks_tsv_url()?;
let mut tsv_file_path = config.expected_cache_path();
tsv_file_path.push(default_tsv_file_path(&config.network.stacks_network));
let mut tsv_sha_file_path = config.expected_cache_path();
Expand All @@ -137,7 +142,7 @@ pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Cont
// Download archive if not already present in cache
// Load the local
let local_sha_file = read_file_content_at_path(&tsv_sha_file_path);
let sha_url = config.expected_remote_stacks_tsv_sha256();
let sha_url = config.expected_remote_stacks_tsv_sha256()?;

let remote_sha_file = match reqwest::get(&sha_url).await {
Ok(response) => response.bytes().await,
Expand All @@ -164,28 +169,25 @@ pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Cont
"Stacks archive file already up to date"
);
config.add_local_stacks_tsv_source(&tsv_file_path);
return false;
return Ok(false);
}

info!(ctx.expect_logger(), "Downloading {}", url);
match download_tsv_file(&config).await {
Ok(_) => {}
Err(e) => {
error!(ctx.expect_logger(), "{}", e);
std::process::exit(1);
}
Err(e) => return Err(e),
}
info!(ctx.expect_logger(), "Successfully downloaded tsv file");
config.add_local_stacks_tsv_source(&tsv_file_path);
}
true
Ok(true)
} else {
info!(
ctx.expect_logger(),
"Streaming blocks from stacks-node {}",
config.network.get_stacks_node_config().rpc_url
);
false
Ok(false)
}
}

Expand Down
10 changes: 8 additions & 2 deletions components/chainhook-cli/src/archive/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,14 @@ async fn it_downloads_stacks_dataset_if_required() {
tracer: false,
};
let mut config_clone = config.clone();
assert!(download_stacks_dataset_if_required(&mut config, &ctx).await);
assert!(!download_stacks_dataset_if_required(&mut config_clone, &ctx).await);
assert!(download_stacks_dataset_if_required(&mut config, &ctx)
.await
.unwrap());
assert!(
!download_stacks_dataset_if_required(&mut config_clone, &ctx)
.await
.unwrap()
);

let mut tsv_file_path = config.expected_cache_path();
tsv_file_path.push(default_tsv_file_path(&config.network.stacks_network));
Expand Down
4 changes: 2 additions & 2 deletions components/chainhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,14 @@ pub fn main() {
let opts: Opts = match Opts::try_parse() {
Ok(opts) => opts,
Err(e) => {
error!(ctx.expect_logger(), "{e}");
crit!(ctx.expect_logger(), "{e}");
process::exit(1);
}
};

match hiro_system_kit::nestable_block_on(handle_command(opts, ctx.clone())) {
Err(e) => {
error!(ctx.expect_logger(), "{e}");
crit!(ctx.expect_logger(), "{e}");
process::exit(1);
}
Ok(_) => {}
Expand Down
22 changes: 12 additions & 10 deletions components/chainhook-cli/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,13 @@ impl Config {
}
}

pub fn expected_local_stacks_tsv_file(&self) -> &PathBuf {
pub fn expected_local_stacks_tsv_file(&self) -> Result<&PathBuf, String> {
for source in self.event_sources.iter() {
if let EventSourceConfig::StacksTsvPath(config) = source {
return &config.file_path;
return Ok(&config.file_path);
}
}
panic!("expected local-tsv source")
Err("could not find expected local tsv source")?
}

pub fn expected_cache_path(&self) -> PathBuf {
Expand All @@ -271,21 +271,23 @@ impl Config {
destination_path
}

fn expected_remote_stacks_tsv_base_url(&self) -> &String {
fn expected_remote_stacks_tsv_base_url(&self) -> Result<&String, String> {
for source in self.event_sources.iter() {
if let EventSourceConfig::StacksTsvUrl(config) = source {
return &config.file_url;
return Ok(&config.file_url);
}
}
panic!("expected remote-tsv source")
Err("could not find expected remote tsv source")?
}

pub fn expected_remote_stacks_tsv_sha256(&self) -> String {
format!("{}.sha256", self.expected_remote_stacks_tsv_base_url())
pub fn expected_remote_stacks_tsv_sha256(&self) -> Result<String, String> {
self.expected_remote_stacks_tsv_base_url()
.map(|url| format!("{}.sha256", url))
}

pub fn expected_remote_stacks_tsv_url(&self) -> String {
format!("{}.gz", self.expected_remote_stacks_tsv_base_url())
pub fn expected_remote_stacks_tsv_url(&self) -> Result<String, String> {
self.expected_remote_stacks_tsv_base_url()
.map(|url| format!("{}.gz", url))
}

pub fn rely_on_remote_stacks_tsv(&self) -> bool {
Expand Down
28 changes: 21 additions & 7 deletions components/chainhook-cli/src/config/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,34 +108,48 @@ fn should_download_remote_stacks_tsv_handles_both_modes() {
}

#[test]
#[should_panic(expected = "expected remote-tsv source")]
fn expected_remote_stacks_tsv_base_url_panics_if_missing() {
let url_src = EventSourceConfig::StacksTsvUrl(super::UrlConfig {
file_url: format!("test"),
});
let mut config = Config::default(true, false, false, &None).unwrap();

config.event_sources = vec![url_src.clone()];
assert_eq!(config.expected_remote_stacks_tsv_base_url(), "test");
match config.expected_remote_stacks_tsv_base_url() {
Ok(tsv_url) => assert_eq!(tsv_url, "test"),
Err(e) => {
panic!("expected tsv file: {e}")
}
}

config.event_sources = vec![];
config.expected_remote_stacks_tsv_base_url();
match config.expected_remote_stacks_tsv_base_url() {
Ok(tsv_url) => panic!("expected no tsv file, found {}", tsv_url),
Err(e) => assert_eq!(e, "could not find expected remote tsv source".to_string()),
};
}

#[test]
#[should_panic(expected = "expected local-tsv source")]
fn expected_local_stacks_tsv_base_url_panics_if_missing() {
fn expected_local_stacks_tsv_base_url_errors_if_missing() {
let path = PathBuf::from("test");
let path_src = EventSourceConfig::StacksTsvPath(PathConfig {
file_path: path.clone(),
});
let mut config = Config::default(true, false, false, &None).unwrap();

config.event_sources = vec![path_src.clone()];
assert_eq!(config.expected_local_stacks_tsv_file(), &path);
match config.expected_local_stacks_tsv_file() {
Ok(tsv_path) => assert_eq!(tsv_path, &path),
Err(e) => {
panic!("expected tsv file: {e}")
}
}

config.event_sources = vec![];
config.expected_local_stacks_tsv_file();
match config.expected_local_stacks_tsv_file() {
Ok(tsv_path) => panic!("expected no tsv file, found {}", tsv_path.to_string_lossy()),
Err(e) => assert_eq!(e, "could not find expected local tsv source".to_string()),
};
}

#[test]
Expand Down
13 changes: 9 additions & 4 deletions components/chainhook-cli/src/scan/bitcoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
config: &Config,
ctx: &Context,
) -> Result<bool, String> {
let predicate_uuid = &predicate_spec.uuid;
let auth = Auth::UserPass(
config.network.bitcoind_rpc_username.clone(),
config.network.bitcoind_rpc_password.clone(),
Expand Down Expand Up @@ -71,9 +72,9 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
PredicatesApi::Off => None,
};

info!(
debug!(
ctx.expect_logger(),
"Starting predicate evaluation on Bitcoin blocks",
"Starting predicate evaluation on Bitcoin blocks for predicate {predicate_uuid}",
);

let mut last_block_scanned = BlockIdentifier::default();
Expand Down Expand Up @@ -200,7 +201,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(

info!(
ctx.expect_logger(),
"{number_of_blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
"Predicate {predicate_uuid} scan completed. {number_of_blocks_scanned} blocks scanned, {actions_triggered} actions triggered."
);

if let Some(ref mut predicates_db_conn) = predicates_db_conn {
Expand Down Expand Up @@ -269,9 +270,13 @@ pub async fn execute_predicates_action<'a>(
if trigger.chainhook.include_proof {
gather_proofs(&trigger, &mut proofs, &config, &ctx);
}
let predicate_uuid = &trigger.chainhook.uuid;
match handle_bitcoin_hook_action(trigger, &proofs) {
Err(e) => {
error!(ctx.expect_logger(), "unable to handle action {}", e);
warn!(
ctx.expect_logger(),
"unable to handle action for predicate {}: {}", predicate_uuid, e
);
}
Ok(action) => {
actions_triggered += 1;
Expand Down
Loading
Loading