Skip to content

Commit

Permalink
preserve offers unless invalid #1115
Browse files Browse the repository at this point in the history
  • Loading branch information
woodser committed Jul 17, 2024
1 parent 06b0c20 commit d69dcae
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 75 deletions.
12 changes: 6 additions & 6 deletions core/src/main/java/haveno/core/offer/OpenOffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
public final class OpenOffer implements Tradable {

public enum State {
SCHEDULED,
PENDING,
AVAILABLE,
RESERVED,
CLOSED,
Expand Down Expand Up @@ -120,7 +120,7 @@ public OpenOffer(Offer offer, long triggerPrice, boolean reserveExactAmount) {
this.offer = offer;
this.triggerPrice = triggerPrice;
this.reserveExactAmount = reserveExactAmount;
state = State.SCHEDULED;
state = State.PENDING;
}

public OpenOffer(Offer offer, long triggerPrice, OpenOffer openOffer) {
Expand Down Expand Up @@ -165,8 +165,8 @@ private OpenOffer(Offer offer,
this.reserveTxHex = reserveTxHex;
this.reserveTxKey = reserveTxKey;

if (this.state == State.RESERVED)
setState(State.AVAILABLE);
// reset reserved state to available
if (this.state == State.RESERVED) setState(State.AVAILABLE);
}

@Override
Expand Down Expand Up @@ -232,8 +232,8 @@ public ReadOnlyObjectProperty<State> stateProperty() {
return stateProperty;
}

public boolean isScheduled() {
return state == State.SCHEDULED;
public boolean isPending() {
return state == State.PENDING;
}

public boolean isAvailable() {
Expand Down
125 changes: 69 additions & 56 deletions core/src/main/java/haveno/core/offer/OpenOfferManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
private static final long REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC = 30;
private static final long REPUBLISH_INTERVAL_MS = TimeUnit.MINUTES.toMillis(30);
private static final long REFRESH_INTERVAL_MS = OfferPayload.TTL / 2;
private static final int MAX_PROCESS_ATTEMPTS = 5;
private static final int NUM_ATTEMPTS_THRESHOLD = 5; // process pending offer only on republish cycle after this many attempts

private final CoreContext coreContext;
private final KeyRing keyRing;
Expand Down Expand Up @@ -252,17 +252,17 @@ public void readPersisted(Runnable completeHandler) {

// read open offers
persistenceManager.readPersisted(persisted -> {
openOffers.setAll(persisted.getList());
openOffers.forEach(openOffer -> openOffer.getOffer().setPriceFeedService(priceFeedService));
openOffers.setAll(persisted.getList());
openOffers.forEach(openOffer -> openOffer.getOffer().setPriceFeedService(priceFeedService));

// read signed offers
signedOfferPersistenceManager.readPersisted(signedOfferPersisted -> {
signedOffers.setAll(signedOfferPersisted.getList());
completeHandler.run();
},
completeHandler);
},
completeHandler);
// read signed offers
signedOfferPersistenceManager.readPersisted(signedOfferPersisted -> {
signedOffers.setAll(signedOfferPersisted.getList());
completeHandler.run();
},
completeHandler);
},
completeHandler);
}

private synchronized void maybeInitializeKeyImagePoller() {
Expand Down Expand Up @@ -472,17 +472,17 @@ private void onBootstrapComplete() {
// .forEach(openOffer -> OfferUtil.getInvalidMakerFeeTxErrorMessage(openOffer.getOffer(), btcWalletService)
// .ifPresent(errorMsg -> invalidOffers.add(new Tuple2<>(openOffer, errorMsg))));

// process scheduled offers
processScheduledOffers((transaction) -> {}, (errorMessage) -> {
log.warn("Error processing unposted offers: " + errorMessage);
});
// process pending offers
processPendingOffers(false);

// register to process unposted offers on new block
// register to process pending offers on new block
xmrWalletService.addWalletListener(new MoneroWalletListener() {
@Override
public void onNewBlock(long height) {
processScheduledOffers((transaction) -> {}, (errorMessage) -> {
log.warn("Error processing unposted offers on new block {}: {}", height, errorMessage);

// process pending offers on new block a few times
processPendingOffers(true, (transaction) -> {}, (errorMessage) -> {
log.warn("Error processing pending offers on new block {}: {}", height, errorMessage);
});
}
});
Expand Down Expand Up @@ -549,16 +549,15 @@ public void placeOffer(Offer offer,
synchronized (processOffersLock) {
CountDownLatch latch = new CountDownLatch(1);
addOpenOffer(openOffer);
processUnpostedOffer(getOpenOffers(), openOffer, (transaction) -> {
processPendingOffer(getOpenOffers(), openOffer, (transaction) -> {
requestPersistence();
latch.countDown();
resultHandler.handleResult(transaction);
}, (errorMessage) -> {
if (openOffer.isCanceled()) latch.countDown();
else {
log.warn("Error processing unposted offer {}: {}", openOffer.getId(), errorMessage);
log.warn("Error processing pending offer {}: {}", openOffer.getId(), errorMessage);
doCancelOffer(openOffer);
offer.setErrorMessage(errorMessage);
latch.countDown();
errorMessageHandler.handleErrorMessage(errorMessage);
}
Expand All @@ -583,9 +582,11 @@ public void removeOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHa
public void activateOpenOffer(OpenOffer openOffer,
ResultHandler resultHandler,
ErrorMessageHandler errorMessageHandler) {
if (openOffer.isScheduled()) {
resultHandler.handleResult(); // ignore if scheduled
} else if (!offersToBeEdited.containsKey(openOffer.getId())) {
if (openOffer.isPending()) {
resultHandler.handleResult(); // ignore if pending
} else if (offersToBeEdited.containsKey(openOffer.getId())) {
errorMessageHandler.handleErrorMessage("You can't activate an offer that is currently edited.");
} else {
Offer offer = openOffer.getOffer();
offerBookService.activateOffer(offer,
() -> {
Expand All @@ -595,8 +596,6 @@ public void activateOpenOffer(OpenOffer openOffer,
resultHandler.handleResult();
},
errorMessageHandler);
} else {
errorMessageHandler.handleErrorMessage("You can't activate an offer that is currently edited.");
}
}

Expand Down Expand Up @@ -858,26 +857,35 @@ private void removeSignedOffers(String keyImage) {
// Place offer helpers
///////////////////////////////////////////////////////////////////////////////////////////

private void processScheduledOffers(TransactionResultHandler resultHandler, // TODO (woodser): transaction not needed with result handler
private void processPendingOffers(boolean skipOffersWithTooManyAttempts) {
processPendingOffers(skipOffersWithTooManyAttempts, (transaction) -> {}, (errorMessage) -> {
log.warn("Error processing pending offers: " + errorMessage);
});
}

private void processPendingOffers(boolean skipOffersWithTooManyAttempts,
TransactionResultHandler resultHandler, // TODO (woodser): transaction not needed with result handler
ErrorMessageHandler errorMessageHandler) {
ThreadUtils.execute(() -> {
synchronized (processOffersLock) {
List<String> errorMessages = new ArrayList<String>();
List<OpenOffer> openOffers = getOpenOffers();
for (OpenOffer scheduledOffer : openOffers) {
if (scheduledOffer.getState() != OpenOffer.State.SCHEDULED) continue;
for (OpenOffer pendingOffer : openOffers) {
if (pendingOffer.getState() != OpenOffer.State.PENDING) continue;
if (skipOffersWithTooManyAttempts && pendingOffer.getNumProcessingAttempts() > NUM_ATTEMPTS_THRESHOLD) continue; // skip offers with too many attempts
CountDownLatch latch = new CountDownLatch(1);
processUnpostedOffer(openOffers, scheduledOffer, (transaction) -> {
processPendingOffer(openOffers, pendingOffer, (transaction) -> {
latch.countDown();
}, errorMessage -> {
if (!scheduledOffer.isCanceled()) {
log.warn("Error processing unposted offer, offerId={}, attempt={}/{}, error={}", scheduledOffer.getId(), scheduledOffer.getNumProcessingAttempts(), MAX_PROCESS_ATTEMPTS, errorMessage);
if (scheduledOffer.getNumProcessingAttempts() >= MAX_PROCESS_ATTEMPTS) {
log.warn("Offer canceled after {} attempts, offerId={}, error={}", scheduledOffer.getNumProcessingAttempts(), scheduledOffer.getId(), errorMessage);
HavenoUtils.havenoSetup.getTopErrorMsg().set("Offer canceled after " + scheduledOffer.getNumProcessingAttempts() + " attempts. Please switch to a better Monero connection and try again.\n\nOffer ID: " + scheduledOffer.getId() + "\nError: " + errorMessage);
doCancelOffer(scheduledOffer);
}
if (!pendingOffer.isCanceled()) {
log.warn("Error processing pending offer, offerId={}, attempt={}, error={}", pendingOffer.getId(), pendingOffer.getNumProcessingAttempts(), errorMessage);
errorMessages.add(errorMessage);

// cancel offer if invalid
if (pendingOffer.getOffer().getState() == Offer.State.INVALID) {
log.warn("Canceling offer because it's invalid: {}", pendingOffer.getId());
doCancelOffer(pendingOffer);
}
}
latch.countDown();
});
Expand All @@ -890,7 +898,7 @@ private void processScheduledOffers(TransactionResultHandler resultHandler, // T
}, THREAD_ID);
}

private void processUnpostedOffer(List<OpenOffer> openOffers, OpenOffer openOffer, TransactionResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
private void processPendingOffer(List<OpenOffer> openOffers, OpenOffer openOffer, TransactionResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {

// skip if already processing
if (openOffer.isProcessing()) {
Expand All @@ -900,17 +908,18 @@ private void processUnpostedOffer(List<OpenOffer> openOffers, OpenOffer openOffe

// process offer
openOffer.setProcessing(true);
doProcessUnpostedOffer(openOffers, openOffer, (transaction) -> {
doProcessPendingOffer(openOffers, openOffer, (transaction) -> {
openOffer.setProcessing(false);
resultHandler.handleResult(transaction);
}, (errorMsg) -> {
openOffer.setProcessing(false);
openOffer.setNumProcessingAttempts(openOffer.getNumProcessingAttempts() + 1);
openOffer.getOffer().setErrorMessage(errorMsg);
errorMessageHandler.handleErrorMessage(errorMsg);
});
}

private void doProcessUnpostedOffer(List<OpenOffer> openOffers, OpenOffer openOffer, TransactionResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
private void doProcessPendingOffer(List<OpenOffer> openOffers, OpenOffer openOffer, TransactionResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
new Thread(() -> {
try {

Expand Down Expand Up @@ -1075,7 +1084,7 @@ private void setSplitOutputTx(OpenOffer openOffer, MoneroTxWallet splitOutputTx)
openOffer.setSplitOutputTxFee(splitOutputTx.getFee().longValueExact());
openOffer.setScheduledTxHashes(Arrays.asList(splitOutputTx.getHash()));
openOffer.setScheduledAmount(openOffer.getOffer().getAmountNeeded().toString());
openOffer.setState(OpenOffer.State.SCHEDULED);
openOffer.setState(OpenOffer.State.PENDING);
}

private void scheduleWithEarliestTxs(List<OpenOffer> openOffers, OpenOffer openOffer) {
Expand Down Expand Up @@ -1106,13 +1115,13 @@ private void scheduleWithEarliestTxs(List<OpenOffer> openOffers, OpenOffer openO
// schedule txs
openOffer.setScheduledTxHashes(scheduledTxHashes);
openOffer.setScheduledAmount(scheduledAmount.toString());
openOffer.setState(OpenOffer.State.SCHEDULED);
openOffer.setState(OpenOffer.State.PENDING);
}

private BigInteger getScheduledAmount(List<OpenOffer> openOffers) {
BigInteger scheduledAmount = BigInteger.ZERO;
for (OpenOffer openOffer : openOffers) {
if (openOffer.getState() != OpenOffer.State.SCHEDULED) continue;
if (openOffer.getState() != OpenOffer.State.PENDING) continue;
if (openOffer.getScheduledTxHashes() == null) continue;
List<MoneroTxWallet> fundingTxs = xmrWalletService.getTxs(openOffer.getScheduledTxHashes());
for (MoneroTxWallet fundingTx : fundingTxs) {
Expand All @@ -1129,7 +1138,7 @@ private BigInteger getScheduledAmount(List<OpenOffer> openOffers) {
private boolean isTxScheduledByOtherOffer(List<OpenOffer> openOffers, OpenOffer openOffer, String txHash) {
for (OpenOffer otherOffer : openOffers) {
if (otherOffer == openOffer) continue;
if (otherOffer.getState() != OpenOffer.State.SCHEDULED) continue;
if (otherOffer.getState() != OpenOffer.State.PENDING) continue;
if (otherOffer.getScheduledTxHashes() == null) continue;
for (String scheduledTxHash : otherOffer.getScheduledTxHashes()) {
if (txHash.equals(scheduledTxHash)) return true;
Expand Down Expand Up @@ -1732,25 +1741,29 @@ private void republishOffer(OpenOffer openOffer, @Nullable Runnable completeHand
});
} else {

// cancel and recreate offer
doCancelOffer(openOffer);
Offer updatedOffer = new Offer(openOffer.getOffer().getOfferPayload());
updatedOffer.setPriceFeedService(priceFeedService);
OpenOffer updatedOpenOffer = new OpenOffer(updatedOffer, openOffer.getTriggerPrice());
// reset offer state to pending
openOffer.getOffer().getOfferPayload().setArbitratorSignature(null);
openOffer.getOffer().getOfferPayload().setArbitratorSigner(null);
openOffer.getOffer().setState(Offer.State.UNKNOWN);
openOffer.setState(OpenOffer.State.PENDING);

// repost offer
// republish offer
synchronized (processOffersLock) {
CountDownLatch latch = new CountDownLatch(1);
addOpenOffer(updatedOpenOffer);
processUnpostedOffer(getOpenOffers(), updatedOpenOffer, (transaction) -> {
processPendingOffer(getOpenOffers(), openOffer, (transaction) -> {
requestPersistence();
latch.countDown();
if (completeHandler != null) completeHandler.run();
}, (errorMessage) -> {
if (!updatedOpenOffer.isCanceled()) {
log.warn("Error reposting offer {}: {}", updatedOpenOffer.getId(), errorMessage);
doCancelOffer(updatedOpenOffer);
updatedOffer.setErrorMessage(errorMessage);
if (!openOffer.isCanceled()) {
log.warn("Error republishing offer {}: {}", openOffer.getId(), errorMessage);
openOffer.getOffer().setErrorMessage(errorMessage);

// cancel offer if invalid
if (openOffer.getOffer().getState() == Offer.State.INVALID) {
log.warn("Canceling offer because it's invalid: {}", openOffer.getId());
doCancelOffer(openOffer);
}
}
latch.countDown();
if (completeHandler != null) completeHandler.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected void run() {
synchronized (XmrWalletService.WALLET_LOCK) {

// reset protocol timeout
verifyScheduled();
verifyPending();
model.getProtocol().startTimeoutTimer();

// collect relevant info
Expand Down Expand Up @@ -94,7 +94,7 @@ protected void run() {
}

// verify still open
verifyScheduled();
verifyPending();
if (reserveTx != null) break;
}
}
Expand All @@ -104,6 +104,7 @@ protected void run() {
model.getXmrWalletService().resetAddressEntriesForOpenOffer(offer.getId());
if (reserveTx != null) {
model.getXmrWalletService().thawOutputs(HavenoUtils.getInputKeyImages(reserveTx));
offer.getOfferPayload().setReserveTxKeyImages(null);
}

throw e;
Expand Down Expand Up @@ -131,7 +132,7 @@ protected void run() {
}
}

public void verifyScheduled() {
if (!model.getOpenOffer().isScheduled()) throw new RuntimeException("Offer " + model.getOpenOffer().getOffer().getId() + " is canceled");
public void verifyPending() {
if (!model.getOpenOffer().isPending()) throw new RuntimeException("Offer " + model.getOpenOffer().getOffer().getId() + " is canceled");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ public void onDirectMessage(DecryptedMessageWithPubKey decryptedMessageWithPubKe
model.getOpenOffer().getOffer().getOfferPayload().setArbitratorSigner(arbitratorNodeAddress);
model.getOpenOffer().getOffer().setState(Offer.State.OFFER_FEE_RESERVED);
resultHandler.handleResult();
} else {
errorMessageHandler.handleErrorMessage("Arbitrator nacked SignOfferRequest for offer " + request.getOfferId() + ": " + ackMessage.getErrorMessage());
}
} else {
model.getOpenOffer().getOffer().setState(Offer.State.INVALID);
errorMessageHandler.handleErrorMessage("Arbitrator nacked SignOfferRequest for offer " + request.getOfferId() + ": " + ackMessage.getErrorMessage());
}
}
};
model.getP2PService().addDecryptedDirectMessageListener(ackListener);
Expand All @@ -137,9 +138,9 @@ public void onFault(String errorMessage) {
log.warn("Arbitrator unavailable: address={}, error={}", arbitratorNodeAddress, errorMessage);
excludedArbitrators.add(arbitratorNodeAddress);

// check if offer still scheduled
if (!model.getOpenOffer().isScheduled()) {
errorMessageHandler.handleErrorMessage("Offer is no longer scheduled, offerId=" + model.getOpenOffer().getId());
// check if offer still pending
if (!model.getOpenOffer().isPending()) {
errorMessageHandler.handleErrorMessage("Offer is no longer pending, offerId=" + model.getOpenOffer().getId());
return;
}

Expand Down
Loading

0 comments on commit d69dcae

Please sign in to comment.