From adc0a6256c2e40d5e49c8cf030d43ff5cc11be68 Mon Sep 17 00:00:00 2001 From: yukang Date: Fri, 21 Feb 2025 11:59:35 +0800 Subject: [PATCH] add cron job to check active channels --- src/fiber/channel.rs | 91 +++++++++++++++++++++++++++----------- src/fiber/network.rs | 24 ++++++++++ src/fiber/tests/payment.rs | 40 +++++++++-------- src/store/tests/store.rs | 2 + 4 files changed, 113 insertions(+), 44 deletions(-) diff --git a/src/fiber/channel.rs b/src/fiber/channel.rs index 62718d668..608d37007 100644 --- a/src/fiber/channel.rs +++ b/src/fiber/channel.rs @@ -96,6 +96,12 @@ pub const INITIAL_COMMITMENT_NUMBER: u64 = 0; const RETRYABLE_TLC_OPS_INTERVAL: Duration = Duration::from_millis(1000); +// if a important TLC operation is not acked in 30 seconds, we will try to disconnect the peer. +#[cfg(not(test))] +pub const PEER_CHANNEL_RESPONSE_TIMEOUT: u64 = 30 * 1000; +#[cfg(test)] +pub const PEER_CHANNEL_RESPONSE_TIMEOUT: u64 = 5 * 1000; + #[derive(Debug)] pub enum ChannelActorMessage { /// Command are the messages that are sent to the channel actor to perform some action. @@ -1242,10 +1248,10 @@ where state.maybe_transition_to_tx_signatures(flags, &self.network)?; } CommitmentSignedFlags::ChannelReady() => { - state.tlc_state.set_waiting_ack(true); + state.set_waiting_ack(&self.network, true); } CommitmentSignedFlags::PendingShutdown() => { - state.tlc_state.set_waiting_ack(true); + state.set_waiting_ack(&self.network, true); state.maybe_transition_to_shutdown(&self.network)?; } } @@ -1258,13 +1264,6 @@ where state: &mut ChannelActorState, command: AddTlcCommand, ) -> Result { - if !state.local_tlc_info.enabled { - return Err(ProcessingChannelError::InvalidState(format!( - "TLC forwarding is not enabled for channel {}", - state.get_id() - ))); - } - state.check_for_tlc_update(Some(command.amount), true, true)?; state.check_tlc_expiry(command.expiry)?; state.check_tlc_forward_amount( @@ -1406,21 +1405,19 @@ where )) .expect(ASSUME_NETWORK_ACTOR_ALIVE); - let shutdown_info = ShutdownInfo { + state.local_shutdown_info = Some(ShutdownInfo { close_script: command.close_script, fee_rate: command.fee_rate.as_u64(), signature: None, - }; - state.local_shutdown_info = Some(shutdown_info); + }); state.update_state(ChannelState::ShuttingDown( flags | ShuttingDownFlags::OUR_SHUTDOWN_SENT, )); + debug!( "Channel state updated to {:?} after processing shutdown command", &state.state ); - eprintln!("now here ...."); - state.maybe_transition_to_shutdown(&self.network) } } @@ -1684,7 +1681,6 @@ where error, ) .await; - eprintln!("forward tlc error, remove tlc_Op: {:?}", tlc_op); state.tlc_state.remove_pending_tlc_operation(tlc_op); } } @@ -2347,15 +2343,12 @@ where } } ChannelActorMessage::Command(command) => { - //error!("exe command {}", command); if let Err(err) = self.handle_command(&myself, state, command).await { - if !matches!(err, ProcessingChannelError::WaitingTlcAck) { - error!( - "{:?} Error while processing channel command: {:?}", - state.get_local_peer_id(), - err, - ); - } + error!( + "{:?} Error while processing channel command: {:?}", + state.get_local_peer_id(), + err, + ); } } ChannelActorMessage::Event(e) => { @@ -2979,7 +2972,6 @@ impl TlcState { } pub fn update_for_revoke_and_ack(&mut self, commitment_number: CommitmentNumbers) -> bool { - self.set_waiting_ack(false); for tlc in self.offered_tlcs.tlcs.iter_mut() { match tlc.outbound_status() { OutboundTlcStatus::LocalAnnounced => { @@ -3217,6 +3209,11 @@ pub struct ChannelActorState { pub reestablishing: bool, pub created_at: SystemTime, + + // the time stamp we last sent an message to the peer, used to check if the peer is still alive + // we will disconnect the peer if we haven't sent any message to the peer for a long time + // currently we only have set commitment_signed as the heartbeat message, + pub waiting_peer_response: Option, } #[serde_as] @@ -3611,6 +3608,41 @@ impl ChannelActorState { self.local_tlc_info.enabled } + pub fn set_waiting_peer_response(&mut self) { + self.waiting_peer_response = Some(now_timestamp_as_millis_u64()); + } + + pub fn clear_waiting_peer_response(&mut self) { + self.waiting_peer_response = None; + } + + pub fn should_disconnect_peer_awaiting_response(&self) -> bool { + if let Some(timestamp) = self.waiting_peer_response { + let elapsed = now_timestamp_as_millis_u64() - timestamp; + elapsed > PEER_CHANNEL_RESPONSE_TIMEOUT + } else { + false + } + } + + pub fn set_waiting_ack(&mut self, network: &ActorRef, waiting_ack: bool) { + self.tlc_state.set_waiting_ack(waiting_ack); + if waiting_ack { + self.set_waiting_peer_response(); + let channel_id = self.get_id(); + network.send_after( + Duration::from_millis(PEER_CHANNEL_RESPONSE_TIMEOUT), + move || { + NetworkActorMessage::Command(NetworkActorCommand::CheckActiveChannel( + channel_id, + )) + }, + ); + } else { + self.clear_waiting_peer_response(); + } + } + pub async fn try_create_channel_messages( &mut self, network: &ActorRef, @@ -3984,6 +4016,7 @@ impl ChannelActorState { latest_commitment_transaction: None, reestablishing: false, created_at: SystemTime::now(), + waiting_peer_response: None, }; if let Some(nonce) = remote_channel_announcement_nonce { state.update_remote_channel_announcement_nonce(&nonce); @@ -4052,6 +4085,7 @@ impl ChannelActorState { latest_commitment_transaction: None, reestablishing: false, created_at: SystemTime::now(), + waiting_peer_response: None, } } @@ -5174,6 +5208,12 @@ impl ChannelActorState { } if let Some(add_amount) = add_tlc_amount { + if is_tlc_command_message && !self.local_tlc_info.enabled { + return Err(ProcessingChannelError::InvalidState(format!( + "TLC forwarding is not enabled for channel {}", + self.get_id() + ))); + } self.check_tlc_limits(add_amount, is_sent)?; } @@ -6053,6 +6093,7 @@ impl ChannelActorState { let need_commitment_signed = self .tlc_state .update_for_revoke_and_ack(self.commitment_numbers); + self.set_waiting_ack(network, false); network .send_message(NetworkActorMessage::new_notification( NetworkServiceEvent::RevokeAndAckReceived( @@ -6139,7 +6180,7 @@ impl ChannelActorState { } // previous waiting_ack maybe true, reset it after reestablish the channel // if we need to resend CommitmentSigned message, it will be set to proper status again - self.tlc_state.set_waiting_ack(false); + self.set_waiting_ack(network, false); debug!( "Resend AddTlc and RemoveTlc messages if needed: {}", need_resend_commitment_signed diff --git a/src/fiber/network.rs b/src/fiber/network.rs index a4420f2d9..df1d2b5b4 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -201,6 +201,8 @@ pub enum NetworkActorCommand { SavePeerAddress(Multiaddr), // We need to maintain a certain number of peers connections to keep the network running. MaintainConnections, + // Check active channel connections and disconnect inactive peers + CheckActiveChannel(Hash256), // For internal use and debugging only. Most of the messages requires some // changes to local state. Even if we can send a message to a peer, some // part of the local state is not changed. @@ -1098,6 +1100,25 @@ where } } + NetworkActorCommand::CheckActiveChannel(channel_id) => { + if let Some(channel_actor_state) = self.store.get_channel_actor_state(&channel_id) { + if channel_actor_state.should_disconnect_peer_awaiting_response() + && !channel_actor_state.is_closed() + { + debug!( + "Channel {} from peer {:?} is inactive for a time, closing it", + channel_id, + channel_actor_state.get_remote_peer_id() + ); + myself.send_message(NetworkActorMessage::new_command( + NetworkActorCommand::DisconnectPeer( + channel_actor_state.get_remote_peer_id(), + ), + ))?; + } + } + } + NetworkActorCommand::OpenChannel(open_channel, reply) => { match state.create_outbound_channel(open_channel).await { Ok((_, channel_id)) => { @@ -2518,7 +2539,10 @@ where format!("Channel {:x} is already closed", &channel_id), ))); } + } else { + return Err(Error::ChannelNotFound(channel_id)); } + let remote_pubkey = self.get_peer_pubkey(peer_id) .ok_or(ProcessingChannelError::InvalidState(format!( diff --git a/src/fiber/tests/payment.rs b/src/fiber/tests/payment.rs index bb7d39d4b..291c06271 100644 --- a/src/fiber/tests/payment.rs +++ b/src/fiber/tests/payment.rs @@ -2256,7 +2256,6 @@ async fn test_send_payment_shutdown_with_force() { let mut all_sent = HashSet::new(); for i in 0..10 { let res = nodes[0].send_payment_keysend(&nodes[3], 1000, false).await; - eprintln!("res: {:?}", res); if let Ok(send_payment_res) = res { if i > 5 { all_sent.insert(send_payment_res.payment_hash); @@ -2278,24 +2277,27 @@ async fn test_send_payment_shutdown_with_force() { } } - // let mut failed_count = 0; - // while !all_sent.is_empty() { - // for payment_hash in all_sent.clone().iter() { - // let res = nodes[0].get_payment_result(*payment_hash).await; - // eprintln!( - // "payment_hasfh: {:?} status: {:?} failed_count: {:?}", - // payment_hash, res.status, failed_count - // ); - // if res.status == PaymentSessionStatus::Failed { - // failed_count += 1; - // all_sent.remove(payment_hash); - // } - - // tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - // } - // } - // assert_eq!(failed_count, 4); - // tokio::time::sleep(tokio::time::Duration::from_millis(1000 * 10)).await; + // make sure the later payments will fail + // because network actor will find out the inactive channels and disconnect peers + // which send shutdown force message + let mut failed_count = 0; + let expect_failed_count = all_sent.len(); + while !all_sent.is_empty() { + for payment_hash in all_sent.clone().iter() { + let res = nodes[0].get_payment_result(*payment_hash).await; + eprintln!( + "payment_hash: {:?} status: {:?} failed_count: {:?}", + payment_hash, res.status, failed_count + ); + if res.status == PaymentSessionStatus::Failed { + failed_count += 1; + all_sent.remove(payment_hash); + } + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + } + assert!(failed_count >= expect_failed_count); } #[tokio::test] diff --git a/src/store/tests/store.rs b/src/store/tests/store.rs index 58e804182..ce872d9a7 100644 --- a/src/store/tests/store.rs +++ b/src/store/tests/store.rs @@ -378,6 +378,7 @@ fn test_channel_actor_state_store() { remote_constraints: ChannelConstraints::default(), reestablishing: false, created_at: SystemTime::now(), + waiting_peer_response: None, }; let bincode_encoded = bincode::serialize(&state).unwrap(); @@ -489,6 +490,7 @@ fn test_serde_channel_actor_state_ciborium() { remote_constraints: ChannelConstraints::default(), reestablishing: false, created_at: SystemTime::now(), + waiting_peer_response: None, }; let mut serialized = Vec::new();