Skip to content

Commit

Permalink
save and reprocess payment sent message
Browse files Browse the repository at this point in the history
  • Loading branch information
woodser committed Mar 3, 2025
1 parent aee7512 commit 5a297f5
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 4 deletions.
5 changes: 3 additions & 2 deletions core/src/main/java/haveno/core/trade/Trade.java
Original file line number Diff line number Diff line change
Expand Up @@ -2436,8 +2436,9 @@ private void tryInitPollingAux() {
if (!wasWalletSynced) trySyncWallet(true);
updatePollPeriod();

// reprocess pending payout messages
this.getProtocol().maybeReprocessPaymentReceivedMessage(false);
// reprocess pending messages
getProtocol().maybeReprocessPaymentSentMessage(false);
getProtocol().maybeReprocessPaymentReceivedMessage(false);
HavenoUtils.arbitrationManager.maybeReprocessDisputeClosedMessage(this, false);

startPolling();
Expand Down
49 changes: 48 additions & 1 deletion core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
protected ErrorMessageHandler errorMessageHandler;

private boolean depositsConfirmedTasksCalled;
private int reprocessPaymentSentMessageCount;
private int reprocessPaymentReceivedMessageCount;

///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -279,6 +280,22 @@ public void maybeSendDepositsConfirmedMessages() {
}, trade.getId());
}

public void maybeReprocessPaymentSentMessage(boolean reprocessOnError) {
if (trade.isShutDownStarted()) return;
ThreadUtils.execute(() -> {
synchronized (trade.getLock()) {

// skip if no need to reprocess
if (trade.isBuyer() || trade.getBuyer().getPaymentSentMessage() == null || trade.getState().ordinal() >= Trade.State.BUYER_SENT_PAYMENT_SENT_MSG.ordinal()) {
return;
}

log.warn("Reprocessing payment sent message for {} {}", trade.getClass().getSimpleName(), trade.getId());
handle(trade.getBuyer().getPaymentSentMessage(), trade.getBuyer().getPaymentSentMessage().getSenderNodeAddress(), reprocessOnError);
}
}, trade.getId());
}

public void maybeReprocessPaymentReceivedMessage(boolean reprocessOnError) {
if (trade.isShutDownStarted()) return;
ThreadUtils.execute(() -> {
Expand Down Expand Up @@ -481,7 +498,25 @@ public void handle(DepositsConfirmedMessage message, NodeAddress sender) {

// received by seller and arbitrator
protected void handle(PaymentSentMessage message, NodeAddress peer) {
handle(message, peer, true);
}

// received by seller and arbitrator
protected void handle(PaymentSentMessage message, NodeAddress peer, boolean reprocessOnError) {
System.out.println(getClass().getSimpleName() + ".handle(PaymentSentMessage) for " + trade.getClass().getSimpleName() + " " + trade.getShortId());

// validate signature
try {
HavenoUtils.verifyPaymentSentMessage(trade, message);
} catch (Throwable t) {
log.warn("Ignoring PaymentSentMessage with invalid signature for {} {}, error={}", trade.getClass().getSimpleName(), trade.getId(), t.getMessage());
return;
}

// save message for reprocessing
trade.getBuyer().setPaymentSentMessage(message);
trade.requestPersistence();

if (!trade.isInitialized() || trade.isShutDown()) return;
if (!(trade instanceof SellerTrade || trade instanceof ArbitratorTrade)) {
log.warn("Ignoring PaymentSentMessage since not seller or arbitrator");
Expand Down Expand Up @@ -521,7 +556,19 @@ protected void handle(PaymentSentMessage message, NodeAddress peer) {
handleTaskRunnerSuccess(peer, message);
},
(errorMessage) -> {
handleTaskRunnerFault(peer, message, errorMessage);
log.warn("Error processing payment sent message: " + errorMessage);
processModel.getTradeManager().requestPersistence();

// schedule to reprocess message unless deleted
if (trade.getBuyer().getPaymentSentMessage() != null) {
UserThread.runAfter(() -> {
reprocessPaymentSentMessageCount++;
maybeReprocessPaymentSentMessage(reprocessOnError);
}, trade.getReprocessDelayInSeconds(reprocessPaymentSentMessageCount));
} else {
handleTaskRunnerFault(peer, message, errorMessage); // otherwise send nack
}
unlatchTrade();
})))
.executeTasks(true);
awaitTradeLatch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ protected void run() {
trade.getBuyer().setNodeAddress(processModel.getTempTradePeerNodeAddress());

// update state from message
trade.getBuyer().setPaymentSentMessage(message);
trade.getBuyer().setUpdatedMultisigHex(message.getUpdatedMultisigHex());
trade.getSeller().setAccountAgeWitness(message.getSellerAccountAgeWitness());
String counterCurrencyTxId = message.getCounterCurrencyTxId();
Expand Down

0 comments on commit 5a297f5

Please sign in to comment.