Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve reliability to reprocess payment sent and received messages #1629

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
56 changes: 43 additions & 13 deletions core/src/main/java/haveno/core/trade/Trade.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
private static final long DELETE_AFTER_NUM_BLOCKS = 2; // if deposit requested but not published
private static final long EXTENDED_RPC_TIMEOUT = 600000; // 10 minutes
private static final long DELETE_AFTER_MS = TradeProtocol.TRADE_STEP_TIMEOUT_SECONDS;
private static final int NUM_CONFIRMATIONS_FOR_SCHEDULED_IMPORT = 10;
protected final Object pollLock = new Object();
protected static final Object importMultisigLock = new Object();
private boolean pollInProgress;
Expand Down Expand Up @@ -735,12 +736,17 @@ public void initialize(ProcessModelServiceProvider serviceProvider) {
// TODO: buyer's payment sent message state property became unsynced if shut down while awaiting ack from seller. fixed in v1.0.19 so this check can be removed?
if (isBuyer()) {
MessageState expectedState = getPaymentSentMessageState();
if (expectedState != null && expectedState != processModel.getPaymentSentMessageStateProperty().get()) {
log.warn("Updating unexpected payment sent message state for {} {}, expected={}, actual={}", getClass().getSimpleName(), getId(), expectedState, processModel.getPaymentSentMessageStateProperty().get());
processModel.getPaymentSentMessageStateProperty().set(expectedState);
if (expectedState != null && expectedState != processModel.getPaymentSentMessageStatePropertySeller().get()) {
log.warn("Updating unexpected payment sent message state for {} {}, expected={}, actual={}", getClass().getSimpleName(), getId(), expectedState, processModel.getPaymentSentMessageStatePropertySeller().get());
processModel.getPaymentSentMessageStatePropertySeller().set(expectedState);
}
}

// handle confirmations
walletHeight.addListener((observable, oldValue, newValue) -> {
importMultisigHexIfScheduled();
});

// trade is initialized
isInitialized = true;

Expand Down Expand Up @@ -1077,6 +1083,26 @@ public void importMultisigHexIfNeeded() {
}
}

public void scheduleImportMultisigHex() {
processModel.setImportMultisigHexScheduled(true);
requestPersistence();
}

private void importMultisigHexIfScheduled() {
if (!isInitialized || isShutDownStarted) return;
if (!isDepositsConfirmed() || getMaker().getDepositTx() == null) return;
if (walletHeight.get() - getMaker().getDepositTx().getHeight() < NUM_CONFIRMATIONS_FOR_SCHEDULED_IMPORT) return;
ThreadUtils.execute(() -> {
if (!isInitialized || isShutDownStarted) return;
synchronized (getLock()) {
if (processModel.isImportMultisigHexScheduled()) {
processModel.setImportMultisigHexScheduled(false);
ThreadUtils.submitToPool(() -> importMultisigHex());
}
}
}, getId());
}

public void importMultisigHex() {
synchronized (walletLock) {
synchronized (HavenoUtils.getDaemonLock()) { // lock on daemon because import calls full refresh
Expand All @@ -1089,10 +1115,10 @@ public void importMultisigHex() {
} catch (IllegalArgumentException | IllegalStateException e) {
throw e;
} catch (Exception e) {
log.warn("Failed to import multisig hex, tradeId={}, attempt={}/{}, error={}", getShortId(), i + 1, TradeProtocol.MAX_ATTEMPTS, e.getMessage());
handleWalletError(e, sourceConnection);
doPollWallet();
if (isPayoutPublished()) break;
log.warn("Failed to import multisig hex, tradeId={}, attempt={}/{}, error={}", getShortId(), i + 1, TradeProtocol.MAX_ATTEMPTS, e.getMessage());
if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e;
HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying
}
Expand Down Expand Up @@ -1141,6 +1167,9 @@ private void doImportMultisigHex() {
if (removed) wallet.importMultisigHex(multisigHexes.toArray(new String[0]));
if (wallet.isMultisigImportNeeded()) throw new IllegalStateException(errorMessage);
}

// remove scheduled import
processModel.setImportMultisigHexScheduled(false);
} catch (MoneroError e) {

// import multisig hex individually if one is invalid
Expand Down Expand Up @@ -2017,7 +2046,7 @@ public String getRole() {

public MessageState getPaymentSentMessageState() {
if (isPaymentReceived()) return MessageState.ACKNOWLEDGED;
if (processModel.getPaymentSentMessageStateProperty().get() == MessageState.ACKNOWLEDGED) return MessageState.ACKNOWLEDGED;
if (processModel.getPaymentSentMessageStatePropertySeller().get() == MessageState.ACKNOWLEDGED) return MessageState.ACKNOWLEDGED;
switch (state) {
case BUYER_SENT_PAYMENT_SENT_MSG:
return MessageState.SENT;
Expand Down Expand Up @@ -2345,7 +2374,12 @@ public boolean shouldPublishTradeStatistics() {
return tradeAmountTransferred();
}

public boolean tradeAmountTransferred() {

///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////

private boolean tradeAmountTransferred() {
return isPaymentReceived() || (getDisputeResult() != null && getDisputeResult().getWinner() == DisputeResult.Winner.SELLER);
}

Expand All @@ -2361,11 +2395,6 @@ private void doPublishTradeStatistics() {
}
}


///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////

// lazy initialization
private ObjectProperty<BigInteger> getAmountProperty() {
if (tradeAmountProperty == null)
Expand Down Expand Up @@ -2436,8 +2465,9 @@ private void tryInitPollingAux() {
if (!wasWalletSynced) trySyncWallet(true);
updatePollPeriod();

// reprocess pending payout messages
this.getProtocol().maybeReprocessPaymentReceivedMessage(false);
// reprocess pending messages
getProtocol().maybeReprocessPaymentSentMessage(false);
getProtocol().maybeReprocessPaymentReceivedMessage(false);
HavenoUtils.arbitrationManager.maybeReprocessDisputeClosedMessage(this, false);

startPolling();
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/haveno/core/trade/TradeManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ else if (request.getTakerNodeAddress().equals(p2PService.getNetworkNode().getNod
}

private void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) {
log.info("TradeManager handling InitMultisigRequest for tradeId={}, sender={}, uid={}", request.getOfferId(), sender, request.getUid());
log.info("TradeManager handling InitMultisigRequest for tradeId={}, sender={}, uid={}", request.getOfferId(), sender, request.getUid());

try {
Validator.nonEmptyStringOf(request.getOfferId());
Expand All @@ -766,7 +766,7 @@ private void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress
}

private void handleSignContractRequest(SignContractRequest request, NodeAddress sender) {
log.info("TradeManager handling SignContractRequest for tradeId={}, sender={}, uid={}", request.getOfferId(), sender, request.getUid());
log.info("TradeManager handling SignContractRequest for tradeId={}, sender={}, uid={}", request.getOfferId(), sender, request.getUid());

try {
Validator.nonEmptyStringOf(request.getOfferId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void onMailboxMessage(TradeMessage message, NodeAddress peer) {
///////////////////////////////////////////////////////////////////////////////////////////

public void handleInitTradeRequest(InitTradeRequest message, NodeAddress peer, ErrorMessageHandler errorMessageHandler) {
System.out.println("ArbitratorProtocol.handleInitTradeRequest()");
log.info(TradeProtocol.LOG_HIGHLIGHT + "handleInitTradeRequest() for {} {}", trade.getClass().getSimpleName(), trade.getShortId());
ThreadUtils.execute(() -> {
synchronized (trade.getLock()) {
latchTrade();
Expand Down Expand Up @@ -78,7 +78,7 @@ public void handleSignContractResponse(SignContractResponse message, NodeAddress
}

public void handleDepositRequest(DepositRequest request, NodeAddress sender) {
System.out.println("ArbitratorProtocol.handleDepositRequest() " + trade.getId());
log.info(TradeProtocol.LOG_HIGHLIGHT + "handleDepositRequest() for {} {}", trade.getClass().getSimpleName(), trade.getShortId());
ThreadUtils.execute(() -> {
synchronized (trade.getLock()) {
latchTrade();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void handleSignContractResponse(SignContractResponse response, NodeAddres
///////////////////////////////////////////////////////////////////////////////////////////

public void onPaymentSent(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
System.out.println("BuyerProtocol.onPaymentSent()");
log.info(TradeProtocol.LOG_HIGHLIGHT + "BuyerProtocol.onPaymentSent() for {} {}", trade.getClass().getSimpleName(), trade.getShortId());
ThreadUtils.execute(() -> {
synchronized (trade.getLock()) {
latchTrade();
Expand Down
33 changes: 23 additions & 10 deletions core/src/main/java/haveno/core/trade/protocol/ProcessModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,15 @@ public class ProcessModel implements Model, PersistablePayload {
@Getter
@Setter
private long tradeProtocolErrorHeight;
@Getter
@Setter
private boolean importMultisigHexScheduled;

// We want to indicate the user the state of the message delivery of the
// PaymentSentMessage. As well we do an automatic re-send in case it was not ACKed yet.
// To enable that even after restart we persist the state.
@Setter
private ObjectProperty<MessageState> paymentSentMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED);
private ObjectProperty<MessageState> paymentSentMessageStatePropertySeller = new SimpleObjectProperty<>(MessageState.UNDEFINED);
@Setter
private ObjectProperty<MessageState> paymentSentMessageStatePropertyArbitrator = new SimpleObjectProperty<>(MessageState.UNDEFINED);
private ObjectProperty<Boolean> paymentAccountDecryptedProperty = new SimpleObjectProperty<>(false);
Expand Down Expand Up @@ -203,11 +206,12 @@ public protobuf.ProcessModel toProtoMessage() {
.setPubKeyRing(pubKeyRing.toProtoMessage())
.setUseSavingsWallet(useSavingsWallet)
.setFundsNeededForTrade(fundsNeededForTrade)
.setPaymentSentMessageState(paymentSentMessageStateProperty.get().name())
.setPaymentSentMessageStateSeller(paymentSentMessageStatePropertySeller.get().name())
.setPaymentSentMessageStateArbitrator(paymentSentMessageStatePropertyArbitrator.get().name())
.setBuyerPayoutAmountFromMediation(buyerPayoutAmountFromMediation)
.setSellerPayoutAmountFromMediation(sellerPayoutAmountFromMediation)
.setTradeProtocolErrorHeight(tradeProtocolErrorHeight);
.setTradeProtocolErrorHeight(tradeProtocolErrorHeight)
.setImportMultisigHexScheduled(importMultisigHexScheduled);
Optional.ofNullable(maker).ifPresent(e -> builder.setMaker((protobuf.TradePeer) maker.toProtoMessage()));
Optional.ofNullable(taker).ifPresent(e -> builder.setTaker((protobuf.TradePeer) taker.toProtoMessage()));
Optional.ofNullable(arbitrator).ifPresent(e -> builder.setArbitrator((protobuf.TradePeer) arbitrator.toProtoMessage()));
Expand All @@ -231,6 +235,7 @@ public static ProcessModel fromProto(protobuf.ProcessModel proto, CoreProtoResol
processModel.setBuyerPayoutAmountFromMediation(proto.getBuyerPayoutAmountFromMediation());
processModel.setSellerPayoutAmountFromMediation(proto.getSellerPayoutAmountFromMediation());
processModel.setTradeProtocolErrorHeight(proto.getTradeProtocolErrorHeight());
processModel.setImportMultisigHexScheduled(proto.getImportMultisigHexScheduled());

// nullable
processModel.setPayoutTxSignature(ProtoUtil.byteArrayOrNullFromProto(proto.getPayoutTxSignature()));
Expand All @@ -240,9 +245,9 @@ public static ProcessModel fromProto(protobuf.ProcessModel proto, CoreProtoResol
processModel.setTradeFeeAddress(ProtoUtil.stringOrNullFromProto(proto.getTradeFeeAddress()));
processModel.setMultisigAddress(ProtoUtil.stringOrNullFromProto(proto.getMultisigAddress()));

String paymentSentMessageStateString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageState());
MessageState paymentSentMessageState = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateString);
processModel.setPaymentSentMessageState(paymentSentMessageState);
String paymentSentMessageStateSellerString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageStateSeller());
MessageState paymentSentMessageStateSeller = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateSellerString);
processModel.setPaymentSentMessageStateSeller(paymentSentMessageStateSeller);

String paymentSentMessageStateArbitratorString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageStateArbitrator());
MessageState paymentSentMessageStateArbitrator = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateArbitratorString);
Expand Down Expand Up @@ -274,11 +279,11 @@ public NodeAddress getMyNodeAddress() {
return getP2PService().getAddress();
}

void setPaymentSentAckMessage(AckMessage ackMessage) {
void setPaymentSentAckMessageSeller(AckMessage ackMessage) {
MessageState messageState = ackMessage.isSuccess() ?
MessageState.ACKNOWLEDGED :
MessageState.FAILED;
setPaymentSentMessageState(messageState);
setPaymentSentMessageStateSeller(messageState);
}

void setPaymentSentAckMessageArbitrator(AckMessage ackMessage) {
Expand All @@ -288,8 +293,8 @@ void setPaymentSentAckMessageArbitrator(AckMessage ackMessage) {
setPaymentSentMessageStateArbitrator(messageState);
}

public void setPaymentSentMessageState(MessageState paymentSentMessageStateProperty) {
this.paymentSentMessageStateProperty.set(paymentSentMessageStateProperty);
public void setPaymentSentMessageStateSeller(MessageState paymentSentMessageStateProperty) {
this.paymentSentMessageStatePropertySeller.set(paymentSentMessageStateProperty);
if (tradeManager != null) {
tradeManager.requestPersistence();
}
Expand All @@ -302,6 +307,14 @@ public void setPaymentSentMessageStateArbitrator(MessageState paymentSentMessage
}
}

public boolean isPaymentSentMessageAckedBySeller() {
return paymentSentMessageStatePropertySeller.get() == MessageState.ACKNOWLEDGED;
}

public boolean isPaymentSentMessageAckedByArbitrator() {
return paymentSentMessageStatePropertyArbitrator.get() == MessageState.ACKNOWLEDGED;
}

void setDepositTxSentAckMessage(AckMessage ackMessage) {
MessageState messageState = ackMessage.isSuccess() ?
MessageState.ACKNOWLEDGED :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void handleSignContractResponse(SignContractResponse response, NodeAddres
///////////////////////////////////////////////////////////////////////////////////////////

public void onPaymentReceived(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
log.info("SellerProtocol.onPaymentReceived()");
log.info(TradeProtocol.LOG_HIGHLIGHT + "SellerProtocol.onPaymentReceived() for {} {}", trade.getClass().getSimpleName(), trade.getShortId());
ThreadUtils.execute(() -> {
synchronized (trade.getLock()) {
latchTrade();
Expand Down
Loading
Loading