Skip to content

Commit

Permalink
add cron job to check active channels
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Feb 21, 2025
1 parent f71ebd5 commit adc0a62
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 44 deletions.
91 changes: 66 additions & 25 deletions src/fiber/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ pub const INITIAL_COMMITMENT_NUMBER: u64 = 0;

const RETRYABLE_TLC_OPS_INTERVAL: Duration = Duration::from_millis(1000);

// if a important TLC operation is not acked in 30 seconds, we will try to disconnect the peer.
#[cfg(not(test))]
pub const PEER_CHANNEL_RESPONSE_TIMEOUT: u64 = 30 * 1000;
#[cfg(test)]
pub const PEER_CHANNEL_RESPONSE_TIMEOUT: u64 = 5 * 1000;

#[derive(Debug)]
pub enum ChannelActorMessage {
/// Command are the messages that are sent to the channel actor to perform some action.
Expand Down Expand Up @@ -1242,10 +1248,10 @@ where
state.maybe_transition_to_tx_signatures(flags, &self.network)?;
}
CommitmentSignedFlags::ChannelReady() => {
state.tlc_state.set_waiting_ack(true);
state.set_waiting_ack(&self.network, true);
}
CommitmentSignedFlags::PendingShutdown() => {
state.tlc_state.set_waiting_ack(true);
state.set_waiting_ack(&self.network, true);
state.maybe_transition_to_shutdown(&self.network)?;
}
}
Expand All @@ -1258,13 +1264,6 @@ where
state: &mut ChannelActorState,
command: AddTlcCommand,
) -> Result<u64, ProcessingChannelError> {
if !state.local_tlc_info.enabled {
return Err(ProcessingChannelError::InvalidState(format!(
"TLC forwarding is not enabled for channel {}",
state.get_id()
)));
}

state.check_for_tlc_update(Some(command.amount), true, true)?;
state.check_tlc_expiry(command.expiry)?;
state.check_tlc_forward_amount(
Expand Down Expand Up @@ -1406,21 +1405,19 @@ where
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);

let shutdown_info = ShutdownInfo {
state.local_shutdown_info = Some(ShutdownInfo {
close_script: command.close_script,
fee_rate: command.fee_rate.as_u64(),
signature: None,
};
state.local_shutdown_info = Some(shutdown_info);
});
state.update_state(ChannelState::ShuttingDown(
flags | ShuttingDownFlags::OUR_SHUTDOWN_SENT,
));

debug!(
"Channel state updated to {:?} after processing shutdown command",
&state.state
);
eprintln!("now here ....");

state.maybe_transition_to_shutdown(&self.network)
}
}
Expand Down Expand Up @@ -1684,7 +1681,6 @@ where
error,
)
.await;
eprintln!("forward tlc error, remove tlc_Op: {:?}", tlc_op);
state.tlc_state.remove_pending_tlc_operation(tlc_op);
}
}
Expand Down Expand Up @@ -2347,15 +2343,12 @@ where
}
}
ChannelActorMessage::Command(command) => {
//error!("exe command {}", command);
if let Err(err) = self.handle_command(&myself, state, command).await {
if !matches!(err, ProcessingChannelError::WaitingTlcAck) {
error!(
"{:?} Error while processing channel command: {:?}",
state.get_local_peer_id(),
err,
);
}
error!(
"{:?} Error while processing channel command: {:?}",
state.get_local_peer_id(),
err,
);
}
}
ChannelActorMessage::Event(e) => {
Expand Down Expand Up @@ -2979,7 +2972,6 @@ impl TlcState {
}

pub fn update_for_revoke_and_ack(&mut self, commitment_number: CommitmentNumbers) -> bool {
self.set_waiting_ack(false);
for tlc in self.offered_tlcs.tlcs.iter_mut() {
match tlc.outbound_status() {
OutboundTlcStatus::LocalAnnounced => {
Expand Down Expand Up @@ -3217,6 +3209,11 @@ pub struct ChannelActorState {
pub reestablishing: bool,

pub created_at: SystemTime,

// the time stamp we last sent an message to the peer, used to check if the peer is still alive
// we will disconnect the peer if we haven't sent any message to the peer for a long time
// currently we only have set commitment_signed as the heartbeat message,
pub waiting_peer_response: Option<u64>,
}

#[serde_as]
Expand Down Expand Up @@ -3611,6 +3608,41 @@ impl ChannelActorState {
self.local_tlc_info.enabled
}

pub fn set_waiting_peer_response(&mut self) {
self.waiting_peer_response = Some(now_timestamp_as_millis_u64());
}

pub fn clear_waiting_peer_response(&mut self) {
self.waiting_peer_response = None;
}

pub fn should_disconnect_peer_awaiting_response(&self) -> bool {
if let Some(timestamp) = self.waiting_peer_response {
let elapsed = now_timestamp_as_millis_u64() - timestamp;
elapsed > PEER_CHANNEL_RESPONSE_TIMEOUT
} else {
false
}
}

pub fn set_waiting_ack(&mut self, network: &ActorRef<NetworkActorMessage>, waiting_ack: bool) {
self.tlc_state.set_waiting_ack(waiting_ack);
if waiting_ack {
self.set_waiting_peer_response();
let channel_id = self.get_id();
network.send_after(
Duration::from_millis(PEER_CHANNEL_RESPONSE_TIMEOUT),
move || {
NetworkActorMessage::Command(NetworkActorCommand::CheckActiveChannel(
channel_id,
))
},
);
} else {
self.clear_waiting_peer_response();
}
}

pub async fn try_create_channel_messages(
&mut self,
network: &ActorRef<NetworkActorMessage>,
Expand Down Expand Up @@ -3984,6 +4016,7 @@ impl ChannelActorState {
latest_commitment_transaction: None,
reestablishing: false,
created_at: SystemTime::now(),
waiting_peer_response: None,
};
if let Some(nonce) = remote_channel_announcement_nonce {
state.update_remote_channel_announcement_nonce(&nonce);
Expand Down Expand Up @@ -4052,6 +4085,7 @@ impl ChannelActorState {
latest_commitment_transaction: None,
reestablishing: false,
created_at: SystemTime::now(),
waiting_peer_response: None,
}
}

Expand Down Expand Up @@ -5174,6 +5208,12 @@ impl ChannelActorState {
}

if let Some(add_amount) = add_tlc_amount {
if is_tlc_command_message && !self.local_tlc_info.enabled {
return Err(ProcessingChannelError::InvalidState(format!(
"TLC forwarding is not enabled for channel {}",
self.get_id()
)));
}
self.check_tlc_limits(add_amount, is_sent)?;
}

Expand Down Expand Up @@ -6053,6 +6093,7 @@ impl ChannelActorState {
let need_commitment_signed = self
.tlc_state
.update_for_revoke_and_ack(self.commitment_numbers);
self.set_waiting_ack(network, false);
network
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::RevokeAndAckReceived(
Expand Down Expand Up @@ -6139,7 +6180,7 @@ impl ChannelActorState {
}
// previous waiting_ack maybe true, reset it after reestablish the channel
// if we need to resend CommitmentSigned message, it will be set to proper status again
self.tlc_state.set_waiting_ack(false);
self.set_waiting_ack(network, false);
debug!(
"Resend AddTlc and RemoveTlc messages if needed: {}",
need_resend_commitment_signed
Expand Down
24 changes: 24 additions & 0 deletions src/fiber/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ pub enum NetworkActorCommand {
SavePeerAddress(Multiaddr),
// We need to maintain a certain number of peers connections to keep the network running.
MaintainConnections,
// Check active channel connections and disconnect inactive peers
CheckActiveChannel(Hash256),
// For internal use and debugging only. Most of the messages requires some
// changes to local state. Even if we can send a message to a peer, some
// part of the local state is not changed.
Expand Down Expand Up @@ -1098,6 +1100,25 @@ where
}
}

NetworkActorCommand::CheckActiveChannel(channel_id) => {
if let Some(channel_actor_state) = self.store.get_channel_actor_state(&channel_id) {
if channel_actor_state.should_disconnect_peer_awaiting_response()
&& !channel_actor_state.is_closed()
{
debug!(
"Channel {} from peer {:?} is inactive for a time, closing it",
channel_id,
channel_actor_state.get_remote_peer_id()
);
myself.send_message(NetworkActorMessage::new_command(
NetworkActorCommand::DisconnectPeer(
channel_actor_state.get_remote_peer_id(),
),
))?;
}
}
}

NetworkActorCommand::OpenChannel(open_channel, reply) => {
match state.create_outbound_channel(open_channel).await {
Ok((_, channel_id)) => {
Expand Down Expand Up @@ -2518,7 +2539,10 @@ where
format!("Channel {:x} is already closed", &channel_id),
)));
}
} else {
return Err(Error::ChannelNotFound(channel_id));
}

let remote_pubkey =
self.get_peer_pubkey(peer_id)
.ok_or(ProcessingChannelError::InvalidState(format!(
Expand Down
40 changes: 21 additions & 19 deletions src/fiber/tests/payment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2256,7 +2256,6 @@ async fn test_send_payment_shutdown_with_force() {
let mut all_sent = HashSet::new();
for i in 0..10 {
let res = nodes[0].send_payment_keysend(&nodes[3], 1000, false).await;
eprintln!("res: {:?}", res);
if let Ok(send_payment_res) = res {
if i > 5 {
all_sent.insert(send_payment_res.payment_hash);
Expand All @@ -2278,24 +2277,27 @@ async fn test_send_payment_shutdown_with_force() {
}
}

// let mut failed_count = 0;
// while !all_sent.is_empty() {
// for payment_hash in all_sent.clone().iter() {
// let res = nodes[0].get_payment_result(*payment_hash).await;
// eprintln!(
// "payment_hasfh: {:?} status: {:?} failed_count: {:?}",
// payment_hash, res.status, failed_count
// );
// if res.status == PaymentSessionStatus::Failed {
// failed_count += 1;
// all_sent.remove(payment_hash);
// }

// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// }
// }
// assert_eq!(failed_count, 4);
// tokio::time::sleep(tokio::time::Duration::from_millis(1000 * 10)).await;
// make sure the later payments will fail
// because network actor will find out the inactive channels and disconnect peers
// which send shutdown force message
let mut failed_count = 0;
let expect_failed_count = all_sent.len();
while !all_sent.is_empty() {
for payment_hash in all_sent.clone().iter() {
let res = nodes[0].get_payment_result(*payment_hash).await;
eprintln!(
"payment_hash: {:?} status: {:?} failed_count: {:?}",
payment_hash, res.status, failed_count
);
if res.status == PaymentSessionStatus::Failed {
failed_count += 1;
all_sent.remove(payment_hash);
}

tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
assert!(failed_count >= expect_failed_count);
}

#[tokio::test]
Expand Down
2 changes: 2 additions & 0 deletions src/store/tests/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ fn test_channel_actor_state_store() {
remote_constraints: ChannelConstraints::default(),
reestablishing: false,
created_at: SystemTime::now(),
waiting_peer_response: None,
};

let bincode_encoded = bincode::serialize(&state).unwrap();
Expand Down Expand Up @@ -489,6 +490,7 @@ fn test_serde_channel_actor_state_ciborium() {
remote_constraints: ChannelConstraints::default(),
reestablishing: false,
created_at: SystemTime::now(),
waiting_peer_response: None,
};

let mut serialized = Vec::new();
Expand Down

0 comments on commit adc0a62

Please sign in to comment.