From 5a297f547c7421ce79b29c05faa713c18329bc29 Mon Sep 17 00:00:00 2001 From: woodser <13068859+woodser@users.noreply.github.com> Date: Mon, 3 Mar 2025 08:51:32 -0500 Subject: [PATCH] save and reprocess payment sent message --- .../main/java/haveno/core/trade/Trade.java | 5 +- .../core/trade/protocol/TradeProtocol.java | 49 ++++++++++++++++++- .../tasks/ProcessPaymentSentMessage.java | 1 - 3 files changed, 51 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/haveno/core/trade/Trade.java b/core/src/main/java/haveno/core/trade/Trade.java index 3084aa77f18..f50d9b9810e 100644 --- a/core/src/main/java/haveno/core/trade/Trade.java +++ b/core/src/main/java/haveno/core/trade/Trade.java @@ -2436,8 +2436,9 @@ private void tryInitPollingAux() { if (!wasWalletSynced) trySyncWallet(true); updatePollPeriod(); - // reprocess pending payout messages - this.getProtocol().maybeReprocessPaymentReceivedMessage(false); + // reprocess pending messages + getProtocol().maybeReprocessPaymentSentMessage(false); + getProtocol().maybeReprocessPaymentReceivedMessage(false); HavenoUtils.arbitrationManager.maybeReprocessDisputeClosedMessage(this, false); startPolling(); diff --git a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java index 117c16cc050..6400e69f6a8 100644 --- a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java @@ -106,6 +106,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D protected ErrorMessageHandler errorMessageHandler; private boolean depositsConfirmedTasksCalled; + private int reprocessPaymentSentMessageCount; private int reprocessPaymentReceivedMessageCount; /////////////////////////////////////////////////////////////////////////////////////////// @@ -279,6 +280,22 @@ public void maybeSendDepositsConfirmedMessages() { }, trade.getId()); } + public void maybeReprocessPaymentSentMessage(boolean reprocessOnError) { + if (trade.isShutDownStarted()) return; + ThreadUtils.execute(() -> { + synchronized (trade.getLock()) { + + // skip if no need to reprocess + if (trade.isBuyer() || trade.getBuyer().getPaymentSentMessage() == null || trade.getState().ordinal() >= Trade.State.BUYER_SENT_PAYMENT_SENT_MSG.ordinal()) { + return; + } + + log.warn("Reprocessing payment sent message for {} {}", trade.getClass().getSimpleName(), trade.getId()); + handle(trade.getBuyer().getPaymentSentMessage(), trade.getBuyer().getPaymentSentMessage().getSenderNodeAddress(), reprocessOnError); + } + }, trade.getId()); + } + public void maybeReprocessPaymentReceivedMessage(boolean reprocessOnError) { if (trade.isShutDownStarted()) return; ThreadUtils.execute(() -> { @@ -481,7 +498,25 @@ public void handle(DepositsConfirmedMessage message, NodeAddress sender) { // received by seller and arbitrator protected void handle(PaymentSentMessage message, NodeAddress peer) { + handle(message, peer, true); + } + + // received by seller and arbitrator + protected void handle(PaymentSentMessage message, NodeAddress peer, boolean reprocessOnError) { System.out.println(getClass().getSimpleName() + ".handle(PaymentSentMessage) for " + trade.getClass().getSimpleName() + " " + trade.getShortId()); + + // validate signature + try { + HavenoUtils.verifyPaymentSentMessage(trade, message); + } catch (Throwable t) { + log.warn("Ignoring PaymentSentMessage with invalid signature for {} {}, error={}", trade.getClass().getSimpleName(), trade.getId(), t.getMessage()); + return; + } + + // save message for reprocessing + trade.getBuyer().setPaymentSentMessage(message); + trade.requestPersistence(); + if (!trade.isInitialized() || trade.isShutDown()) return; if (!(trade instanceof SellerTrade || trade instanceof ArbitratorTrade)) { log.warn("Ignoring PaymentSentMessage since not seller or arbitrator"); @@ -521,7 +556,19 @@ protected void handle(PaymentSentMessage message, NodeAddress peer) { handleTaskRunnerSuccess(peer, message); }, (errorMessage) -> { - handleTaskRunnerFault(peer, message, errorMessage); + log.warn("Error processing payment sent message: " + errorMessage); + processModel.getTradeManager().requestPersistence(); + + // schedule to reprocess message unless deleted + if (trade.getBuyer().getPaymentSentMessage() != null) { + UserThread.runAfter(() -> { + reprocessPaymentSentMessageCount++; + maybeReprocessPaymentSentMessage(reprocessOnError); + }, trade.getReprocessDelayInSeconds(reprocessPaymentSentMessageCount)); + } else { + handleTaskRunnerFault(peer, message, errorMessage); // otherwise send nack + } + unlatchTrade(); }))) .executeTasks(true); awaitTradeLatch(); diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessPaymentSentMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessPaymentSentMessage.java index 93d1ce520aa..de7d949ade8 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessPaymentSentMessage.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessPaymentSentMessage.java @@ -48,7 +48,6 @@ protected void run() { trade.getBuyer().setNodeAddress(processModel.getTempTradePeerNodeAddress()); // update state from message - trade.getBuyer().setPaymentSentMessage(message); trade.getBuyer().setUpdatedMultisigHex(message.getUpdatedMultisigHex()); trade.getSeller().setAccountAgeWitness(message.getSellerAccountAgeWitness()); String counterCurrencyTxId = message.getCounterCurrencyTxId();