diff --git a/core/src/main/java/haveno/core/trade/Trade.java b/core/src/main/java/haveno/core/trade/Trade.java index 6ad8e7aef33..6e9086e1efd 100644 --- a/core/src/main/java/haveno/core/trade/Trade.java +++ b/core/src/main/java/haveno/core/trade/Trade.java @@ -143,6 +143,7 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model { private static final long DELETE_AFTER_NUM_BLOCKS = 2; // if deposit requested but not published private static final long EXTENDED_RPC_TIMEOUT = 600000; // 10 minutes private static final long DELETE_AFTER_MS = TradeProtocol.TRADE_STEP_TIMEOUT_SECONDS; + private static final int NUM_CONFIRMATIONS_FOR_SCHEDULED_IMPORT = 10; protected final Object pollLock = new Object(); protected static final Object importMultisigLock = new Object(); private boolean pollInProgress; @@ -735,12 +736,17 @@ public void initialize(ProcessModelServiceProvider serviceProvider) { // TODO: buyer's payment sent message state property became unsynced if shut down while awaiting ack from seller. fixed in v1.0.19 so this check can be removed? if (isBuyer()) { MessageState expectedState = getPaymentSentMessageState(); - if (expectedState != null && expectedState != processModel.getPaymentSentMessageStateProperty().get()) { - log.warn("Updating unexpected payment sent message state for {} {}, expected={}, actual={}", getClass().getSimpleName(), getId(), expectedState, processModel.getPaymentSentMessageStateProperty().get()); - processModel.getPaymentSentMessageStateProperty().set(expectedState); + if (expectedState != null && expectedState != processModel.getPaymentSentMessageStatePropertySeller().get()) { + log.warn("Updating unexpected payment sent message state for {} {}, expected={}, actual={}", getClass().getSimpleName(), getId(), expectedState, processModel.getPaymentSentMessageStatePropertySeller().get()); + processModel.getPaymentSentMessageStatePropertySeller().set(expectedState); } } + // handle confirmations + walletHeight.addListener((observable, oldValue, newValue) -> { + importMultisigHexIfScheduled(); + }); + // trade is initialized isInitialized = true; @@ -1077,6 +1083,26 @@ public void importMultisigHexIfNeeded() { } } + public void scheduleImportMultisigHex() { + processModel.setImportMultisigHexScheduled(true); + requestPersistence(); + } + + private void importMultisigHexIfScheduled() { + if (!isInitialized || isShutDownStarted) return; + if (!isDepositsConfirmed() || getMaker().getDepositTx() == null) return; + if (walletHeight.get() - getMaker().getDepositTx().getHeight() < NUM_CONFIRMATIONS_FOR_SCHEDULED_IMPORT) return; + ThreadUtils.execute(() -> { + if (!isInitialized || isShutDownStarted) return; + synchronized (getLock()) { + if (processModel.isImportMultisigHexScheduled()) { + processModel.setImportMultisigHexScheduled(false); + ThreadUtils.submitToPool(() -> importMultisigHex()); + } + } + }, getId()); + } + public void importMultisigHex() { synchronized (walletLock) { synchronized (HavenoUtils.getDaemonLock()) { // lock on daemon because import calls full refresh @@ -1089,10 +1115,10 @@ public void importMultisigHex() { } catch (IllegalArgumentException | IllegalStateException e) { throw e; } catch (Exception e) { + log.warn("Failed to import multisig hex, tradeId={}, attempt={}/{}, error={}", getShortId(), i + 1, TradeProtocol.MAX_ATTEMPTS, e.getMessage()); handleWalletError(e, sourceConnection); doPollWallet(); if (isPayoutPublished()) break; - log.warn("Failed to import multisig hex, tradeId={}, attempt={}/{}, error={}", getShortId(), i + 1, TradeProtocol.MAX_ATTEMPTS, e.getMessage()); if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e; HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying } @@ -1141,6 +1167,9 @@ private void doImportMultisigHex() { if (removed) wallet.importMultisigHex(multisigHexes.toArray(new String[0])); if (wallet.isMultisigImportNeeded()) throw new IllegalStateException(errorMessage); } + + // remove scheduled import + processModel.setImportMultisigHexScheduled(false); } catch (MoneroError e) { // import multisig hex individually if one is invalid @@ -2017,7 +2046,7 @@ public String getRole() { public MessageState getPaymentSentMessageState() { if (isPaymentReceived()) return MessageState.ACKNOWLEDGED; - if (processModel.getPaymentSentMessageStateProperty().get() == MessageState.ACKNOWLEDGED) return MessageState.ACKNOWLEDGED; + if (processModel.getPaymentSentMessageStatePropertySeller().get() == MessageState.ACKNOWLEDGED) return MessageState.ACKNOWLEDGED; switch (state) { case BUYER_SENT_PAYMENT_SENT_MSG: return MessageState.SENT; @@ -2345,7 +2374,12 @@ public boolean shouldPublishTradeStatistics() { return tradeAmountTransferred(); } - public boolean tradeAmountTransferred() { + + /////////////////////////////////////////////////////////////////////////////////////////// + // Private + /////////////////////////////////////////////////////////////////////////////////////////// + + private boolean tradeAmountTransferred() { return isPaymentReceived() || (getDisputeResult() != null && getDisputeResult().getWinner() == DisputeResult.Winner.SELLER); } @@ -2361,11 +2395,6 @@ private void doPublishTradeStatistics() { } } - - /////////////////////////////////////////////////////////////////////////////////////////// - // Private - /////////////////////////////////////////////////////////////////////////////////////////// - // lazy initialization private ObjectProperty getAmountProperty() { if (tradeAmountProperty == null) @@ -2436,8 +2465,9 @@ private void tryInitPollingAux() { if (!wasWalletSynced) trySyncWallet(true); updatePollPeriod(); - // reprocess pending payout messages - this.getProtocol().maybeReprocessPaymentReceivedMessage(false); + // reprocess pending messages + getProtocol().maybeReprocessPaymentSentMessage(false); + getProtocol().maybeReprocessPaymentReceivedMessage(false); HavenoUtils.arbitrationManager.maybeReprocessDisputeClosedMessage(this, false); startPolling(); diff --git a/core/src/main/java/haveno/core/trade/TradeManager.java b/core/src/main/java/haveno/core/trade/TradeManager.java index caabdb384a3..fc2ed30d5b6 100644 --- a/core/src/main/java/haveno/core/trade/TradeManager.java +++ b/core/src/main/java/haveno/core/trade/TradeManager.java @@ -747,7 +747,7 @@ else if (request.getTakerNodeAddress().equals(p2PService.getNetworkNode().getNod } private void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) { - log.info("TradeManager handling InitMultisigRequest for tradeId={}, sender={}, uid={}", request.getOfferId(), sender, request.getUid()); + log.info("TradeManager handling InitMultisigRequest for tradeId={}, sender={}, uid={}", request.getOfferId(), sender, request.getUid()); try { Validator.nonEmptyStringOf(request.getOfferId()); @@ -766,7 +766,7 @@ private void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress } private void handleSignContractRequest(SignContractRequest request, NodeAddress sender) { - log.info("TradeManager handling SignContractRequest for tradeId={}, sender={}, uid={}", request.getOfferId(), sender, request.getUid()); + log.info("TradeManager handling SignContractRequest for tradeId={}, sender={}, uid={}", request.getOfferId(), sender, request.getUid()); try { Validator.nonEmptyStringOf(request.getOfferId()); diff --git a/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java b/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java index 98b3f1ab0dc..b25c6c68d14 100644 --- a/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java @@ -43,7 +43,7 @@ public void onMailboxMessage(TradeMessage message, NodeAddress peer) { /////////////////////////////////////////////////////////////////////////////////////////// public void handleInitTradeRequest(InitTradeRequest message, NodeAddress peer, ErrorMessageHandler errorMessageHandler) { - System.out.println("ArbitratorProtocol.handleInitTradeRequest()"); + log.info(TradeProtocol.LOG_HIGHLIGHT + "handleInitTradeRequest() for {} {}", trade.getClass().getSimpleName(), trade.getShortId()); ThreadUtils.execute(() -> { synchronized (trade.getLock()) { latchTrade(); @@ -78,7 +78,7 @@ public void handleSignContractResponse(SignContractResponse message, NodeAddress } public void handleDepositRequest(DepositRequest request, NodeAddress sender) { - System.out.println("ArbitratorProtocol.handleDepositRequest() " + trade.getId()); + log.info(TradeProtocol.LOG_HIGHLIGHT + "handleDepositRequest() for {} {}", trade.getClass().getSimpleName(), trade.getShortId()); ThreadUtils.execute(() -> { synchronized (trade.getLock()) { latchTrade(); diff --git a/core/src/main/java/haveno/core/trade/protocol/BuyerProtocol.java b/core/src/main/java/haveno/core/trade/protocol/BuyerProtocol.java index 06e1eead7c0..4302f6db6fb 100644 --- a/core/src/main/java/haveno/core/trade/protocol/BuyerProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/BuyerProtocol.java @@ -119,7 +119,7 @@ public void handleSignContractResponse(SignContractResponse response, NodeAddres /////////////////////////////////////////////////////////////////////////////////////////// public void onPaymentSent(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { - System.out.println("BuyerProtocol.onPaymentSent()"); + log.info(TradeProtocol.LOG_HIGHLIGHT + "BuyerProtocol.onPaymentSent() for {} {}", trade.getClass().getSimpleName(), trade.getShortId()); ThreadUtils.execute(() -> { synchronized (trade.getLock()) { latchTrade(); diff --git a/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java b/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java index 54aaa65d37b..8521174ca8c 100644 --- a/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java +++ b/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java @@ -158,12 +158,15 @@ public class ProcessModel implements Model, PersistablePayload { @Getter @Setter private long tradeProtocolErrorHeight; + @Getter + @Setter + private boolean importMultisigHexScheduled; // We want to indicate the user the state of the message delivery of the // PaymentSentMessage. As well we do an automatic re-send in case it was not ACKed yet. // To enable that even after restart we persist the state. @Setter - private ObjectProperty paymentSentMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED); + private ObjectProperty paymentSentMessageStatePropertySeller = new SimpleObjectProperty<>(MessageState.UNDEFINED); @Setter private ObjectProperty paymentSentMessageStatePropertyArbitrator = new SimpleObjectProperty<>(MessageState.UNDEFINED); private ObjectProperty paymentAccountDecryptedProperty = new SimpleObjectProperty<>(false); @@ -203,11 +206,12 @@ public protobuf.ProcessModel toProtoMessage() { .setPubKeyRing(pubKeyRing.toProtoMessage()) .setUseSavingsWallet(useSavingsWallet) .setFundsNeededForTrade(fundsNeededForTrade) - .setPaymentSentMessageState(paymentSentMessageStateProperty.get().name()) + .setPaymentSentMessageStateSeller(paymentSentMessageStatePropertySeller.get().name()) .setPaymentSentMessageStateArbitrator(paymentSentMessageStatePropertyArbitrator.get().name()) .setBuyerPayoutAmountFromMediation(buyerPayoutAmountFromMediation) .setSellerPayoutAmountFromMediation(sellerPayoutAmountFromMediation) - .setTradeProtocolErrorHeight(tradeProtocolErrorHeight); + .setTradeProtocolErrorHeight(tradeProtocolErrorHeight) + .setImportMultisigHexScheduled(importMultisigHexScheduled); Optional.ofNullable(maker).ifPresent(e -> builder.setMaker((protobuf.TradePeer) maker.toProtoMessage())); Optional.ofNullable(taker).ifPresent(e -> builder.setTaker((protobuf.TradePeer) taker.toProtoMessage())); Optional.ofNullable(arbitrator).ifPresent(e -> builder.setArbitrator((protobuf.TradePeer) arbitrator.toProtoMessage())); @@ -231,6 +235,7 @@ public static ProcessModel fromProto(protobuf.ProcessModel proto, CoreProtoResol processModel.setBuyerPayoutAmountFromMediation(proto.getBuyerPayoutAmountFromMediation()); processModel.setSellerPayoutAmountFromMediation(proto.getSellerPayoutAmountFromMediation()); processModel.setTradeProtocolErrorHeight(proto.getTradeProtocolErrorHeight()); + processModel.setImportMultisigHexScheduled(proto.getImportMultisigHexScheduled()); // nullable processModel.setPayoutTxSignature(ProtoUtil.byteArrayOrNullFromProto(proto.getPayoutTxSignature())); @@ -240,9 +245,9 @@ public static ProcessModel fromProto(protobuf.ProcessModel proto, CoreProtoResol processModel.setTradeFeeAddress(ProtoUtil.stringOrNullFromProto(proto.getTradeFeeAddress())); processModel.setMultisigAddress(ProtoUtil.stringOrNullFromProto(proto.getMultisigAddress())); - String paymentSentMessageStateString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageState()); - MessageState paymentSentMessageState = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateString); - processModel.setPaymentSentMessageState(paymentSentMessageState); + String paymentSentMessageStateSellerString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageStateSeller()); + MessageState paymentSentMessageStateSeller = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateSellerString); + processModel.setPaymentSentMessageStateSeller(paymentSentMessageStateSeller); String paymentSentMessageStateArbitratorString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageStateArbitrator()); MessageState paymentSentMessageStateArbitrator = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateArbitratorString); @@ -274,11 +279,11 @@ public NodeAddress getMyNodeAddress() { return getP2PService().getAddress(); } - void setPaymentSentAckMessage(AckMessage ackMessage) { + void setPaymentSentAckMessageSeller(AckMessage ackMessage) { MessageState messageState = ackMessage.isSuccess() ? MessageState.ACKNOWLEDGED : MessageState.FAILED; - setPaymentSentMessageState(messageState); + setPaymentSentMessageStateSeller(messageState); } void setPaymentSentAckMessageArbitrator(AckMessage ackMessage) { @@ -288,8 +293,8 @@ void setPaymentSentAckMessageArbitrator(AckMessage ackMessage) { setPaymentSentMessageStateArbitrator(messageState); } - public void setPaymentSentMessageState(MessageState paymentSentMessageStateProperty) { - this.paymentSentMessageStateProperty.set(paymentSentMessageStateProperty); + public void setPaymentSentMessageStateSeller(MessageState paymentSentMessageStateProperty) { + this.paymentSentMessageStatePropertySeller.set(paymentSentMessageStateProperty); if (tradeManager != null) { tradeManager.requestPersistence(); } @@ -302,6 +307,14 @@ public void setPaymentSentMessageStateArbitrator(MessageState paymentSentMessage } } + public boolean isPaymentSentMessageAckedBySeller() { + return paymentSentMessageStatePropertySeller.get() == MessageState.ACKNOWLEDGED; + } + + public boolean isPaymentSentMessageAckedByArbitrator() { + return paymentSentMessageStatePropertyArbitrator.get() == MessageState.ACKNOWLEDGED; + } + void setDepositTxSentAckMessage(AckMessage ackMessage) { MessageState messageState = ackMessage.isSuccess() ? MessageState.ACKNOWLEDGED : diff --git a/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java b/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java index 4de8fa0d6a8..2b7f45877a1 100644 --- a/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java @@ -115,7 +115,7 @@ public void handleSignContractResponse(SignContractResponse response, NodeAddres /////////////////////////////////////////////////////////////////////////////////////////// public void onPaymentReceived(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { - log.info("SellerProtocol.onPaymentReceived()"); + log.info(TradeProtocol.LOG_HIGHLIGHT + "SellerProtocol.onPaymentReceived() for {} {}", trade.getClass().getSimpleName(), trade.getShortId()); ThreadUtils.execute(() -> { synchronized (trade.getLock()) { latchTrade(); diff --git a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java index 684dfb7c628..08c99570f6a 100644 --- a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java @@ -96,6 +96,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D private static final String TIMEOUT_REACHED = "Timeout reached."; public static final int MAX_ATTEMPTS = 5; // max attempts to create txs and other wallet functions public static final long REPROCESS_DELAY_MS = 5000; + public static final String LOG_HIGHLIGHT = "\u001B[0m"; // terminal default protected final ProcessModel processModel; protected final Trade trade; @@ -106,6 +107,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D protected ErrorMessageHandler errorMessageHandler; private boolean depositsConfirmedTasksCalled; + private int reprocessPaymentSentMessageCount; private int reprocessPaymentReceivedMessageCount; /////////////////////////////////////////////////////////////////////////////////////////// @@ -124,12 +126,12 @@ public TradeProtocol(Trade trade) { protected void onTradeMessage(TradeMessage message, NodeAddress peerNodeAddress) { log.info("Received {} as TradeMessage from {} with tradeId {} and uid {}", message.getClass().getSimpleName(), peerNodeAddress, message.getOfferId(), message.getUid()); - ThreadUtils.execute(() -> handle(message, peerNodeAddress), trade.getId()); + handle(message, peerNodeAddress); } protected void onMailboxMessage(TradeMessage message, NodeAddress peerNodeAddress) { log.info("Received {} as MailboxMessage from {} with tradeId {} and uid {}", message.getClass().getSimpleName(), peerNodeAddress, message.getOfferId(), message.getUid()); - ThreadUtils.execute(() -> handle(message, peerNodeAddress), trade.getId()); + handle(message, peerNodeAddress); } private void handle(TradeMessage message, NodeAddress peerNodeAddress) { @@ -279,6 +281,22 @@ public void maybeSendDepositsConfirmedMessages() { }, trade.getId()); } + public void maybeReprocessPaymentSentMessage(boolean reprocessOnError) { + if (trade.isShutDownStarted()) return; + ThreadUtils.execute(() -> { + synchronized (trade.getLock()) { + + // skip if no need to reprocess + if (trade.isBuyer() || trade.getBuyer().getPaymentSentMessage() == null || trade.getState().ordinal() >= Trade.State.BUYER_SENT_PAYMENT_SENT_MSG.ordinal()) { + return; + } + + log.warn("Reprocessing payment sent message for {} {}", trade.getClass().getSimpleName(), trade.getId()); + handle(trade.getBuyer().getPaymentSentMessage(), trade.getBuyer().getPaymentSentMessage().getSenderNodeAddress(), reprocessOnError); + } + }, trade.getId()); + } + public void maybeReprocessPaymentReceivedMessage(boolean reprocessOnError) { if (trade.isShutDownStarted()) return; ThreadUtils.execute(() -> { @@ -296,7 +314,7 @@ public void maybeReprocessPaymentReceivedMessage(boolean reprocessOnError) { } public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) { - System.out.println(getClass().getSimpleName() + ".handleInitMultisigRequest() for " + trade.getClass().getSimpleName() + " " + trade.getShortId()); + log.info(LOG_HIGHLIGHT + "handleInitMultisigRequest() for " + trade.getClass().getSimpleName() + " " + trade.getShortId() + " from " + sender); trade.addInitProgressStep(); ThreadUtils.execute(() -> { synchronized (trade.getLock()) { @@ -333,7 +351,7 @@ public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress s } public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) { - System.out.println(getClass().getSimpleName() + ".handleSignContractRequest() for " + trade.getClass().getSimpleName() + " " + trade.getShortId()); + log.info(LOG_HIGHLIGHT + "handleSignContractRequest() for " + trade.getClass().getSimpleName() + " " + trade.getShortId() + " from " + sender); ThreadUtils.execute(() -> { synchronized (trade.getLock()) { @@ -376,7 +394,7 @@ public void handleSignContractRequest(SignContractRequest message, NodeAddress s } public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) { - System.out.println(getClass().getSimpleName() + ".handleSignContractResponse() for " + trade.getClass().getSimpleName() + " " + trade.getShortId()); + log.info(LOG_HIGHLIGHT + "handleSignContractResponse() for " + trade.getClass().getSimpleName() + " " + trade.getShortId() + " from " + sender); trade.addInitProgressStep(); ThreadUtils.execute(() -> { synchronized (trade.getLock()) { @@ -422,7 +440,7 @@ public void handleSignContractResponse(SignContractResponse message, NodeAddress } public void handleDepositResponse(DepositResponse response, NodeAddress sender) { - System.out.println(getClass().getSimpleName() + ".handleDepositResponse() for " + trade.getClass().getSimpleName() + " " + trade.getShortId()); + log.info(LOG_HIGHLIGHT + "handleDepositResponse() for " + trade.getClass().getSimpleName() + " " + trade.getShortId() + " from " + sender); trade.addInitProgressStep(); ThreadUtils.execute(() -> { synchronized (trade.getLock()) { @@ -452,7 +470,7 @@ public void handleDepositResponse(DepositResponse response, NodeAddress sender) } public void handle(DepositsConfirmedMessage message, NodeAddress sender) { - System.out.println(getClass().getSimpleName() + ".handle(DepositsConfirmedMessage) from " + sender + " for " + trade.getClass().getSimpleName() + " " + trade.getShortId()); + log.info(LOG_HIGHLIGHT + "handle(DepositsConfirmedMessage) for " + trade.getClass().getSimpleName() + " " + trade.getShortId() + " from " + sender); if (!trade.isInitialized() || trade.isShutDown()) return; ThreadUtils.execute(() -> { synchronized (trade.getLock()) { @@ -481,7 +499,25 @@ public void handle(DepositsConfirmedMessage message, NodeAddress sender) { // received by seller and arbitrator protected void handle(PaymentSentMessage message, NodeAddress peer) { - System.out.println(getClass().getSimpleName() + ".handle(PaymentSentMessage) for " + trade.getClass().getSimpleName() + " " + trade.getShortId()); + handle(message, peer, true); + } + + // received by seller and arbitrator + protected void handle(PaymentSentMessage message, NodeAddress peer, boolean reprocessOnError) { + log.info(LOG_HIGHLIGHT + "handle(PaymentSentMessage) for " + trade.getClass().getSimpleName() + " " + trade.getShortId() + " from " + peer); + + // validate signature + try { + HavenoUtils.verifyPaymentSentMessage(trade, message); + } catch (Throwable t) { + log.warn("Ignoring PaymentSentMessage with invalid signature for {} {}, error={}", trade.getClass().getSimpleName(), trade.getId(), t.getMessage()); + return; + } + + // save message for reprocessing + trade.getBuyer().setPaymentSentMessage(message); + trade.requestPersistence(); + if (!trade.isInitialized() || trade.isShutDown()) return; if (!(trade instanceof SellerTrade || trade instanceof ArbitratorTrade)) { log.warn("Ignoring PaymentSentMessage since not seller or arbitrator"); @@ -521,7 +557,19 @@ protected void handle(PaymentSentMessage message, NodeAddress peer) { handleTaskRunnerSuccess(peer, message); }, (errorMessage) -> { - handleTaskRunnerFault(peer, message, errorMessage); + log.warn("Error processing payment sent message: " + errorMessage); + processModel.getTradeManager().requestPersistence(); + + // schedule to reprocess message unless deleted + if (trade.getBuyer().getPaymentSentMessage() != null) { + UserThread.runAfter(() -> { + reprocessPaymentSentMessageCount++; + maybeReprocessPaymentSentMessage(reprocessOnError); + }, trade.getReprocessDelayInSeconds(reprocessPaymentSentMessageCount)); + } else { + handleTaskRunnerFault(peer, message, errorMessage); // otherwise send nack + } + unlatchTrade(); }))) .executeTasks(true); awaitTradeLatch(); @@ -535,7 +583,20 @@ protected void handle(PaymentReceivedMessage message, NodeAddress peer) { } private void handle(PaymentReceivedMessage message, NodeAddress peer, boolean reprocessOnError) { - System.out.println(getClass().getSimpleName() + ".handle(PaymentReceivedMessage) for " + trade.getClass().getSimpleName() + " " + trade.getShortId()); + log.info(LOG_HIGHLIGHT + "handle(PaymentReceivedMessage) for " + trade.getClass().getSimpleName() + " " + trade.getShortId() + " from " + peer); + + // validate signature + try { + HavenoUtils.verifyPaymentReceivedMessage(trade, message); + } catch (Throwable t) { + log.warn("Ignoring PaymentReceivedMessage with invalid signature for {} {}, error={}", trade.getClass().getSimpleName(), trade.getId(), t.getMessage()); + return; + } + + // save message for reprocessing + trade.getSeller().setPaymentReceivedMessage(message); + trade.requestPersistence(); + if (!trade.isInitialized() || trade.isShutDown()) return; ThreadUtils.execute(() -> { if (!(trade instanceof BuyerTrade || trade instanceof ArbitratorTrade)) { @@ -652,11 +713,12 @@ private void onAckMessage(AckMessage ackMessage, NodeAddress sender) { // handle ack for PaymentSentMessage, which automatically re-sends if not ACKed in a certain time if (ackMessage.getSourceMsgClassName().equals(PaymentSentMessage.class.getSimpleName())) { if (trade.getTradePeer(sender) == trade.getSeller()) { - processModel.setPaymentSentAckMessage(ackMessage); + processModel.setPaymentSentAckMessageSeller(ackMessage); trade.setStateIfValidTransitionTo(Trade.State.SELLER_RECEIVED_PAYMENT_SENT_MSG); processModel.getTradeManager().requestPersistence(); } else if (trade.getTradePeer(sender) == trade.getArbitrator()) { processModel.setPaymentSentAckMessageArbitrator(ackMessage); + processModel.getTradeManager().requestPersistence(); } else if (!ackMessage.isSuccess()) { String err = "Received AckMessage with error state for " + ackMessage.getSourceMsgClassName() + " from "+ sender + " with tradeId " + trade.getId() + " and errorMessage=" + ackMessage.getErrorMessage(); log.warn(err); diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java index f060effe78b..bc064399a37 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java @@ -170,7 +170,7 @@ private void cleanup() { timer.stop(); } if (listener != null) { - processModel.getPaymentSentMessageStateProperty().removeListener(listener); + processModel.getPaymentSentMessageStatePropertySeller().removeListener(listener); } } @@ -194,8 +194,8 @@ private void tryToSendAgainLater() { if (resendCounter == 0) { listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue); - processModel.getPaymentSentMessageStateProperty().addListener(listener); - onMessageStateChange(processModel.getPaymentSentMessageStateProperty().get()); + processModel.getPaymentSentMessageStatePropertySeller().addListener(listener); + onMessageStateChange(processModel.getPaymentSentMessageStatePropertySeller().get()); } // first re-send is after 2 minutes, then increase the delay exponentially diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToArbitrator.java b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToArbitrator.java index cc4113e3422..cd3098737af 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToArbitrator.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToArbitrator.java @@ -18,7 +18,6 @@ package haveno.core.trade.protocol.tasks; import haveno.common.taskrunner.TaskRunner; -import haveno.core.network.MessageState; import haveno.core.trade.Trade; import haveno.core.trade.protocol.TradePeer; import lombok.EqualsAndHashCode; @@ -59,6 +58,6 @@ protected void setStateArrived() { @Override protected boolean isAckedByReceiver() { - return trade.getProcessModel().getPaymentSentMessageStatePropertyArbitrator().get() == MessageState.ACKNOWLEDGED; + return trade.getProcessModel().isPaymentSentMessageAckedByArbitrator(); } } diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToSeller.java b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToSeller.java index caf402be0a2..825220d5b4a 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToSeller.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToSeller.java @@ -40,25 +40,25 @@ protected TradePeer getReceiver() { @Override protected void setStateSent() { - trade.getProcessModel().setPaymentSentMessageState(MessageState.SENT); + trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.SENT); super.setStateSent(); } @Override protected void setStateArrived() { - trade.getProcessModel().setPaymentSentMessageState(MessageState.ARRIVED); + trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.ARRIVED); super.setStateArrived(); } @Override protected void setStateStoredInMailbox() { - trade.getProcessModel().setPaymentSentMessageState(MessageState.STORED_IN_MAILBOX); + trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.STORED_IN_MAILBOX); super.setStateStoredInMailbox(); } @Override protected void setStateFault() { - trade.getProcessModel().setPaymentSentMessageState(MessageState.FAILED); + trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.FAILED); super.setStateFault(); } @@ -72,6 +72,6 @@ protected void onFault(String errorMessage, TradeMessage message) { @Override protected boolean isAckedByReceiver() { - return trade.getState().ordinal() >= Trade.State.SELLER_RECEIVED_PAYMENT_SENT_MSG.ordinal(); + return trade.getProcessModel().isPaymentSentMessageAckedBySeller(); } } diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessDepositsConfirmedMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessDepositsConfirmedMessage.java index c11df74fae6..7e0c85af2da 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessDepositsConfirmedMessage.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessDepositsConfirmedMessage.java @@ -18,7 +18,6 @@ package haveno.core.trade.protocol.tasks; -import haveno.common.ThreadUtils; import haveno.common.taskrunner.TaskRunner; import haveno.core.trade.Trade; import haveno.core.trade.messages.DepositsConfirmedMessage; @@ -63,17 +62,7 @@ protected void run() { // update multisig hex if (sender.getUpdatedMultisigHex() == null) { sender.setUpdatedMultisigHex(request.getUpdatedMultisigHex()); - - // try to import multisig hex (retry later) - if (!trade.isPayoutPublished()) { - ThreadUtils.submitToPool(() -> { - try { - trade.importMultisigHex(); - } catch (Exception e) { - log.warn("Error importing multisig hex on deposits confirmed for trade " + trade.getId() + ": " + e.getMessage() + "\n", e); - } - }); - } + trade.scheduleImportMultisigHex(); } // persist diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessPaymentReceivedMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessPaymentReceivedMessage.java index 2ce29828a4a..f1016f3c618 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessPaymentReceivedMessage.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessPaymentReceivedMessage.java @@ -80,9 +80,6 @@ protected void run() { return; } - // save message for reprocessing - trade.getSeller().setPaymentReceivedMessage(message); - // set state trade.getSeller().setUpdatedMultisigHex(message.getUpdatedMultisigHex()); trade.getBuyer().setAccountAgeWitness(message.getBuyerAccountAgeWitness()); diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessPaymentSentMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessPaymentSentMessage.java index 93d1ce520aa..de7d949ade8 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessPaymentSentMessage.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessPaymentSentMessage.java @@ -48,7 +48,6 @@ protected void run() { trade.getBuyer().setNodeAddress(processModel.getTempTradePeerNodeAddress()); // update state from message - trade.getBuyer().setPaymentSentMessage(message); trade.getBuyer().setUpdatedMultisigHex(message.getUpdatedMultisigHex()); trade.getSeller().setAccountAgeWitness(message.getSellerAccountAgeWitness()); String counterCurrencyTxId = message.getCounterCurrencyTxId(); diff --git a/core/src/main/java/haveno/core/xmr/setup/MoneroWalletRpcManager.java b/core/src/main/java/haveno/core/xmr/setup/MoneroWalletRpcManager.java index c993ebd1816..1bb1e3500ec 100644 --- a/core/src/main/java/haveno/core/xmr/setup/MoneroWalletRpcManager.java +++ b/core/src/main/java/haveno/core/xmr/setup/MoneroWalletRpcManager.java @@ -135,7 +135,7 @@ public void stopInstance(MoneroWalletRpc walletRpc, String path, boolean force) // stop process String pid = walletRpc.getProcess() == null ? null : String.valueOf(walletRpc.getProcess().pid()); - log.info("Stopping MoneroWalletRpc path={}, port={}, pid={}", path, port, pid); + log.info("Stopping MoneroWalletRpc path={}, port={}, pid={}, force={}", path, port, pid, force); walletRpc.stopProcess(force); } diff --git a/daemon/src/main/java/haveno/daemon/grpc/GrpcOffersService.java b/daemon/src/main/java/haveno/daemon/grpc/GrpcOffersService.java index 84443da4d55..485ff38ca82 100644 --- a/daemon/src/main/java/haveno/daemon/grpc/GrpcOffersService.java +++ b/daemon/src/main/java/haveno/daemon/grpc/GrpcOffersService.java @@ -208,7 +208,7 @@ final Optional rateMeteringInterceptor() { put(getGetOffersMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 30 : 3, SECONDS)); put(getGetMyOffersMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 30 : 3, Config.baseCurrencyNetwork().isTestnet() ? SECONDS : MINUTES)); put(getPostOfferMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 30 : 3, Config.baseCurrencyNetwork().isTestnet() ? SECONDS : MINUTES)); - put(getCancelOfferMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 10 : 3, Config.baseCurrencyNetwork().isTestnet() ? SECONDS : MINUTES)); + put(getCancelOfferMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 30 : 3, Config.baseCurrencyNetwork().isTestnet() ? SECONDS : MINUTES)); }} ))); } diff --git a/daemon/src/main/java/haveno/daemon/grpc/GrpcTradesService.java b/daemon/src/main/java/haveno/daemon/grpc/GrpcTradesService.java index 286fccf51aa..0a5d2d39d2a 100644 --- a/daemon/src/main/java/haveno/daemon/grpc/GrpcTradesService.java +++ b/daemon/src/main/java/haveno/daemon/grpc/GrpcTradesService.java @@ -252,14 +252,14 @@ final Optional rateMeteringInterceptor() { .or(() -> Optional.of(CallRateMeteringInterceptor.valueOf( new HashMap<>() {{ put(getGetTradeMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 30 : 1, SECONDS)); - put(getGetTradesMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 10 : 1, SECONDS)); - put(getTakeOfferMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 20 : 3, Config.baseCurrencyNetwork().isTestnet() ? SECONDS : MINUTES)); - put(getConfirmPaymentSentMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 10 : 3, Config.baseCurrencyNetwork().isTestnet() ? SECONDS : MINUTES)); - put(getConfirmPaymentReceivedMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 10 : 3, Config.baseCurrencyNetwork().isTestnet() ? SECONDS : MINUTES)); - put(getCompleteTradeMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 10 : 3, Config.baseCurrencyNetwork().isTestnet() ? SECONDS : MINUTES)); + put(getGetTradesMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 30 : 1, SECONDS)); + put(getTakeOfferMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 30 : 3, Config.baseCurrencyNetwork().isTestnet() ? SECONDS : MINUTES)); + put(getConfirmPaymentSentMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 30 : 3, Config.baseCurrencyNetwork().isTestnet() ? SECONDS : MINUTES)); + put(getConfirmPaymentReceivedMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 30 : 3, Config.baseCurrencyNetwork().isTestnet() ? SECONDS : MINUTES)); + put(getCompleteTradeMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 30 : 3, Config.baseCurrencyNetwork().isTestnet() ? SECONDS : MINUTES)); put(getWithdrawFundsMethod().getFullMethodName(), new GrpcCallRateMeter(3, MINUTES)); - put(getGetChatMessagesMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 10 : 4, Config.baseCurrencyNetwork().isTestnet() ? SECONDS : MINUTES)); - put(getSendChatMessageMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 10 : 4, Config.baseCurrencyNetwork().isTestnet() ? SECONDS : MINUTES)); + put(getGetChatMessagesMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 30 : 4, Config.baseCurrencyNetwork().isTestnet() ? SECONDS : MINUTES)); + put(getSendChatMessageMethod().getFullMethodName(), new GrpcCallRateMeter(Config.baseCurrencyNetwork().isTestnet() ? 30 : 4, Config.baseCurrencyNetwork().isTestnet() ? SECONDS : MINUTES)); }} ))); } diff --git a/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/PendingTradesViewModel.java b/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/PendingTradesViewModel.java index 41881c58be3..b9936236c68 100644 --- a/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/PendingTradesViewModel.java +++ b/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/PendingTradesViewModel.java @@ -100,7 +100,7 @@ enum SellerState implements State { private final ObjectProperty buyerState = new SimpleObjectProperty<>(); private final ObjectProperty sellerState = new SimpleObjectProperty<>(); @Getter - private final ObjectProperty messageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED); + private final ObjectProperty paymentSentMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED); private Subscription tradeStateSubscription; private Subscription paymentAccountDecryptedSubscription; private Subscription payoutStateSubscription; @@ -186,7 +186,7 @@ public void onSelectedItemChanged(PendingTradesListItem selectedItem) { if (messageStateSubscription != null) { messageStateSubscription.unsubscribe(); - messageStateProperty.set(MessageState.UNDEFINED); + paymentSentMessageStateProperty.set(MessageState.UNDEFINED); } if (selectedItem != null) { @@ -200,7 +200,7 @@ public void onSelectedItemChanged(PendingTradesListItem selectedItem) { payoutStateSubscription = EasyBind.subscribe(trade.payoutStateProperty(), state -> { onPayoutStateChanged(state); }); - messageStateSubscription = EasyBind.subscribe(trade.getProcessModel().getPaymentSentMessageStateProperty(), this::onMessageStateChanged); + messageStateSubscription = EasyBind.subscribe(trade.getProcessModel().getPaymentSentMessageStatePropertySeller(), this::onPaymentSentMessageStateChanged); } } } @@ -215,8 +215,8 @@ private void refresh() { }); } - private void onMessageStateChanged(MessageState messageState) { - messageStateProperty.set(messageState); + private void onPaymentSentMessageStateChanged(MessageState messageState) { + paymentSentMessageStateProperty.set(messageState); } /////////////////////////////////////////////////////////////////////////////////////////// diff --git a/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/steps/buyer/BuyerStep3View.java b/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/steps/buyer/BuyerStep3View.java index 57fda229f3e..b28eda4a108 100644 --- a/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/steps/buyer/BuyerStep3View.java +++ b/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/steps/buyer/BuyerStep3View.java @@ -52,7 +52,7 @@ public BuyerStep3View(PendingTradesViewModel model) { public void activate() { super.activate(); - model.getMessageStateProperty().addListener(messageStateChangeListener); + model.getPaymentSentMessageStateProperty().addListener(messageStateChangeListener); updateMessageStateInfo(); } @@ -60,7 +60,7 @@ public void activate() { public void deactivate() { super.deactivate(); - model.getMessageStateProperty().removeListener(messageStateChangeListener); + model.getPaymentSentMessageStateProperty().removeListener(messageStateChangeListener); } @@ -87,7 +87,7 @@ protected String getInfoText() { } private void updateMessageStateInfo() { - MessageState messageState = model.getMessageStateProperty().get(); + MessageState messageState = model.getPaymentSentMessageStateProperty().get(); textFieldWithIcon.setText(Res.get("message.state." + messageState.name())); Label iconLabel = textFieldWithIcon.getIconLabel(); switch (messageState) { diff --git a/proto/src/main/proto/pb.proto b/proto/src/main/proto/pb.proto index 04c32ee8dcb..9bca09b668c 100644 --- a/proto/src/main/proto/pb.proto +++ b/proto/src/main/proto/pb.proto @@ -1568,7 +1568,7 @@ message ProcessModel { bytes payout_tx_signature = 4; bool use_savings_wallet = 5; int64 funds_needed_for_trade = 6; - string payment_sent_message_state = 7; + string payment_sent_message_state_seller = 7; string payment_sent_message_state_arbitrator = 8; bytes maker_signature = 9; TradePeer maker = 10; @@ -1581,6 +1581,7 @@ message ProcessModel { int64 seller_payout_amount_from_mediation = 17; int64 trade_protocol_error_height = 18; string trade_fee_address = 19; + bool import_multisig_hex_scheduled = 20; } message TradePeer {