Skip to content

Commit

Permalink
Fixes all small bugs that were found after applying mutations
Browse files Browse the repository at this point in the history
  • Loading branch information
Tezzish committed Jan 24, 2025
1 parent 5d271cf commit 33a540a
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,6 @@ public ZyzzyvaReplica(String replicaId,
);
// we set this as a null commit certificate for view changes etc. this is the first instance the system is stable
this.getMessageLog().setMaxCC(startCC);

this.setRequestTimeoutId(this.setTimeout(
"requestTimeout",
() -> {
log.warning("Replica " + this.getId() + " didn't receive next request in time, init view change");
IHateThePrimaryMessage ihtpm = new IHateThePrimaryMessage(this.getViewNumber());
ihtpm.sign(this.getId());
this.broadcastMessage(ihtpm);
this.handleIHateThePrimaryMessage(this.getId(), ihtpm);
},
Duration.ofSeconds(300)
));
}

@Override
Expand Down Expand Up @@ -272,7 +260,7 @@ public void handleClientRequest(String clientId, Serializable request) {
this.calculateHistory(this.getHighestSequenceNumber() + 1, digest),
// digest
digest);
log.info("Replica " + this.getId() + " ordering request with sequence number " + orm.getSequenceNumber());
log.info("Replica " + this.getId() + " ordering request with sequence number " + orm.getSequenceNumber() + " and operation " + requestMessage.getOperation());
orm.sign(this.getId());
OrderedRequestMessageWrapper ormw = new OrderedRequestMessageWrapper(orm, requestMessage);
this.broadcastMessage(ormw);
Expand Down Expand Up @@ -366,12 +354,19 @@ public void handleOrderedRequestMessageWrapper(String sender, OrderedRequestMess
log.warning("Failed to clear forward to primary timeout, possibly because it's been triggered");
} catch (NullPointerException ignored) {
}
SpeculativeResponseWrapper response = this.executeOrderedRequest(ormw);
SpeculativeResponseWrapper srw = this.executeOrderedRequest(ormw);
if (ormw.getRequestMessage().getClientId().equals("Noop")) {
log.info("Received a noop");
return;
}
this.sendReplyToClient(ormw.getRequestMessage().getClientId(), response);
this.sendReplyToClient(ormw.getRequestMessage().getClientId(), srw);

// checkpointing
if (ormw.getOrderedRequest().getSequenceNumber() % this.getCP_INTERVAL() == 0) {
log.info("Replica " + this.getId() + " going to checkpoint");
this.broadcastMessage(srw.getSpecResponse());
this.handleSpeculativeResponse(this.getId(), srw.getSpecResponse());
}
}

/**
Expand All @@ -394,13 +389,6 @@ public SpeculativeResponseWrapper executeOrderedRequest(OrderedRequestMessageWra
// updates the request cache
this.getMessageLog().putResponseCache(clientId, ormw.getRequestMessage(), srw);

// checkpointing
if (ormw.getOrderedRequest().getSequenceNumber() % this.getCP_INTERVAL() == 0) {
log.info("Checkpointing");
this.broadcastMessage(srw.getSpecResponse());
this.handleSpeculativeResponse(this.getId(), srw.getSpecResponse());
}

return srw;
}

Expand Down Expand Up @@ -756,6 +744,7 @@ private void handleCommitMessage(String sender, CommitMessage commitMessage) {
log.warning("Received invalid commit certificate from " + sender);
return;
}
log.info("Received a commit certificate from " + sender + " with sequence number " + cc.getSequenceNumber());
// commit the operations
this.handleCommitCertificate(cc);
LocalCommitMessage lcm = new LocalCommitMessage(
Expand All @@ -764,6 +753,7 @@ private void handleCommitMessage(String sender, CommitMessage commitMessage) {
cc.getHistory(),
this.getId(),
sender);
log.info("Replica " + this.getId() + " sending a local commit message to " + sender);
lcm.sign(this.getId());
this.sendMessage(lcm, sender);
}
Expand Down Expand Up @@ -801,6 +791,7 @@ private void handleCommitCertificate(CommitCertificate cc) {
this.getMessageLog().setMaxCC(cc);
if (this.getCommitLog().getHighestSequenceNumber() > this.getMessageLog().getMaxCC().getSequenceNumber()) {
log.warning("Replica " + this.getId() + " has a higher sequence number in the commit log than the maxCC");
throw new IllegalStateException("Replica " + this.getId() + " has a higher sequence number in the commit log than the maxCC");
}
}

Expand Down Expand Up @@ -849,7 +840,7 @@ private boolean isValidCommitCertificate(CommitCertificate cc) {

// if the currentCC is not null and the sequence number is the same, we return true, this means we commit again?
if (currentCC != null && cc.getSequenceNumber() == currentCC.getSequenceNumber()) {
log.warning("Replica " + this.getId() +" received a commit certificate with the same sequence number");
log.warning("Replica " + this.getId() + " received a commit certificate with the same sequence number ");
return false;
}

Expand Down Expand Up @@ -994,21 +985,6 @@ private void commitToViewChange() {
log.warning("MaxCC is null");
}


// creates the CCs
// if (this.getMessageLog().getMaxCC() != null && this.getMessageLog().getMaxCC().getViewNumber() == this.getViewNumber()) {
// cc = this.getMessageLog().getMaxCC();
// }
// } else if (this.getMessageLog()
// .getViewConfirmMessages()
// .getOrDefault(this.getViewNumber(), new ArrayList<>())
// .size() >= this.faultsTolerated + 1) {
// cc = new ArrayList<>(this.getMessageLog().getViewConfirmMessages().get(this.getViewNumber()));
// } else {
// if(this.getMessageLog().getNewViewMessages().isEmpty()) {log.warning("New view messages is empty"); return;}
// cc = this.getMessageLog().getNewViewMessages().sequencedValues().getLast();
// }

if (this.getMessageLog().getCheckpointMessages().get(this.getMessageLog().getLastCheckpoint()) == null) {
log.warning("Checkpoint messages is null");
}
Expand All @@ -1032,7 +1008,7 @@ private void commitToViewChange() {
this.broadcastMessage(vcmw);
this.handleViewChangeMessageWrapper(this.getId(), vcmw);
}

log.info("Replica " + this.getId() + " committed to a view change with maxCC " + cc.getSequenceNumber() + ", last checkpoint " + this.getMessageLog().getLastCheckpoint() + " and highest sequence number " + this.getHighestSequenceNumber());
this.setDisgruntled(true);
}

Expand Down Expand Up @@ -1450,31 +1426,7 @@ public void handleViewConfirmMessage(String sender, ViewConfirmMessage vcm) {
private void reconcileLocalHistoryViewChange(Collection<ViewChangeMessage> viewChangeMessages, SortedMap<Long, OrderedRequestMessageWrapper> calculatedHistory) {
long latestStableCheckpoint = viewChangeMessages.stream().map(ViewChangeMessage::getStableCheckpoint).max(Long::compareTo).orElse(-1L);
CommitCertificate maxCC = viewChangeMessages.stream().map(ViewChangeMessage::getCommitCertificate).max(Comparator.comparingLong(CommitCertificate::getSequenceNumber)).get();
// // when we perform a view change directly after a checkpoint
// // our maxCC is equal to this
// if (calculatedHistory.isEmpty()) {
// log.info("Calculated history is empty probably because we committed right before the view change");
// if (maxCC.getSequenceNumber() != latestStableCheckpoint) {
// throw new IllegalStateException("MaxCC sequence number is not equal to the latest stable checkpoint when calculated history is empty");
// }
// // we are up to speed, nothing to reconcile
// if (latestStableCheckpoint == this.getHighestSequenceNumber()) return;
// // to create a CC, we need 2f + 1 replicas or more to agree
// // to create new view message we need 2f + 1 replicas, so our maxCC will always equal the last cc.
// // we therefore roll back to the last CC, since we've already commmitted
// /// TODO: change this then
// if (latestStableCheckpoint < this.getHighestSequenceNumber() && this.getMessageLog().getLastCheckpoint() == latestStableCheckpoint) {
// rollbackToCheckpoint(latestStableCheckpoint, maxCC);
// return;
// }
// // we catch up to the checkpoint
// if (latestStableCheckpoint > this.getHighestSequenceNumber()) {
// this.catchUpToCheckpoint(latestStableCheckpoint, calculatedHistory, maxCC);
// return;
// }
// throw new IllegalStateException("Something went wrong in empty reconciliation");
// }

log.info("Replica " + this.getId() + " reconciling local history with maxCC " + maxCC.getSequenceNumber() + ", latest stable checkpoint " + latestStableCheckpoint + " and highest sequence number " + this.getHighestSequenceNumber());
if (calculatedHistory.isEmpty()) {
// nothing to reconcile
if (latestStableCheckpoint == this.getHighestSequenceNumber()) return;
Expand Down Expand Up @@ -1509,24 +1461,49 @@ else if (this.getHighestSequenceNumber() == latestStableCheckpoint) {
// if we have a higher commit certificate, we set it
this.handleCommitCertificate(maxCC);
}
// max-l > min-s and histories diverge
// max-l > min-s
else {
long lastHistoryKey = this.getHistory().getLastKey();
long lastHistoryKey;
try {
lastHistoryKey = this.getHistory().getLastKey();
} catch (NoSuchElementException e) {
log.warning("No last history key found");
return;
}

long lastHistory = this.getHistory().get(lastHistoryKey);
if (calculatedHistory.get(lastHistoryKey).getOrderedRequest().getHistory() == lastHistory) {
// handle the last commit certificate (make sure everything is committed)
// execute from max-l + 1

for (long i = this.getHighestSequenceNumber() + 1; i <= calculatedHistory.sequencedKeySet().getLast(); i++) {
this.executeOrderedRequest(calculatedHistory.get(i));
}
if (this.getMessageLog().getLastCheckpoint() < latestStableCheckpoint) {
this.getMessageLog().setLastCheckpoint(latestStableCheckpoint);
}
this.handleCommitCertificate(maxCC);

if (this.getMessageLog().getMaxCC().getSequenceNumber() < maxCC.getSequenceNumber()) {
this.handleCommitCertificate(maxCC);
}
}
// histories diverge and we roll back
else {
this.rollbackToCheckpoint(latestStableCheckpoint, calculatedHistory, maxCC);
if (this.getMessageLog().getMaxCC().getSequenceNumber() > maxCC.getSequenceNumber()) {
log.info("Diverging histories, rolling back to checkpoint");
this.getMessageLog().getOrderedMessages().clear();
this.getHistory().clear();
this.getMessageLog().getRequestCache().clear();
for (OrderedRequestMessageWrapper ormw : calculatedHistory.sequencedValues()) {
if (ormw.getOrderedRequest().getSequenceNumber() <= this.getMessageLog().getMaxCC().getSequenceNumber()) {
continue;
}
this.executeOrderedRequest(ormw);
}

} else {
this.rollbackToCheckpoint(latestStableCheckpoint, calculatedHistory, maxCC);
}
}
}

Expand All @@ -1544,6 +1521,7 @@ else if (this.getHighestSequenceNumber() == latestStableCheckpoint) {
* @param maxCC - the maxCC received from the view change messages
*/
private void catchUpToCheckpoint(long latestStableCheckpoint, SortedMap<Long, OrderedRequestMessageWrapper> calculatedHistory, CommitCertificate maxCC) {
log.info("Replica " + this.getId() + " catching up to checkpoint " + latestStableCheckpoint);
// set the latest checkpoint
this.getMessageLog().setLastCheckpoint(latestStableCheckpoint);
// clear the history
Expand All @@ -1570,23 +1548,7 @@ private void catchUpToCheckpoint(long latestStableCheckpoint, SortedMap<Long, Or
* @param maxCC - the maxCC received from the view change messages
*/
private void rollbackToCheckpoint(long latestStableCheckpoint, SortedMap<Long, OrderedRequestMessageWrapper> calculatedHistory, CommitCertificate maxCC) {
// // removes the checkpoint responses
// this.getMessageLog().getSpeculativeResponsesCheckpoint().clear();
// // puts the orm corresponding to the latest stable checkpoint back into the ordered messages
// OrderedRequestMessageWrapper ccRequest = this.getMessageLog().getOrderedMessages().get(latestStableCheckpoint);
// if (ccRequest == null) {
// log.warning("Couldn't find the checkpoint request");
// }
// this.getMessageLog().getOrderedMessages().clear();
// this.getMessageLog().putOrderedRequestMessageWrapper(ccRequest);
// this.getHistory().clear();
// if (maxCC.getSequenceNumber() != latestStableCheckpoint) {
// /// TODO: this doesn't seem right
// throw new IllegalStateException("MaxCC sequence number is not equal to the latest stable checkpoint when calculated history is empty");
// }
// this.getHistory().add(latestStableCheckpoint, maxCC.getHistory());
// this.setHighestSequenceNumber(latestStableCheckpoint);

log.info("Replica " + this.getId() + " rolling back to checkpoint " + latestStableCheckpoint);
// remove everything in the message logs
this.getMessageLog().getSpeculativeResponsesCheckpoint().clear();
this.getMessageLog().getOrderedMessages().clear();
Expand Down Expand Up @@ -1665,6 +1627,10 @@ private void handleNewViewMessage(String sender, NewViewMessage nvm) {
if(!isValidNewViewMessage(nvm)) {
return;
}
if (this.getMessageLog().getNewViewMessages().containsKey(nvm.getFutureViewNumber())) {
log.info("Received a new view message for a view number that we've already received");
return;
}

// add the new view message to the message log
this.getMessageLog().putNewViewMessage(nvm);
Expand Down Expand Up @@ -1724,11 +1690,6 @@ private void beginNewView(ViewConfirmMessage vcm) {
this.getMessageLog().getIHateThePrimaries().getOrDefault(this.getViewNumber(), new TreeMap<>()).clear();
this.getMessageLog().getFillHoleMessages().clear();

// this.getHistory().clear();
// this.getHistory().add(vcm.getLastKnownSequenceNumber(), vcm.getHistory());
/// TODO: See if this is higher than what we have so far, because it might mess with the ordering and cause a replica to skip.
// this.setHighestSequenceNumber(vcm.getLastKnownSequenceNumber());

// sets the view number and primary
log.info("Replica " +
this.getId() +
Expand Down Expand Up @@ -1813,6 +1774,7 @@ private void checkIfCommitCheckpoint(long sequenceNumber) {
* @param sequenceNumber - the sequence number to create the checkpoint for
*/
private void createCheckpoint(long sequenceNumber) {
log.info("Replica " + this.getId() + " created stable checkpoint for sequence number " + sequenceNumber);
// set the last checkpoint
this.getMessageLog().setLastCheckpoint(sequenceNumber);

Expand Down Expand Up @@ -1914,6 +1876,7 @@ private void checkIfCheckpoint(long sequenceNumber) {
);
cm.sign(this.getId());
this.broadcastMessage(cm);
log.info("Replica " + this.getId() + " sent a checkpoint message for sequence number " + sequenceNumber);
this.handleCheckpointMessage(this.getId(), cm);
}
}
Expand Down
4 changes: 2 additions & 2 deletions simulator/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ springdoc:
# ByzzBench configuration
byzzbench:
autostart: false # Whether to start running scenarios automatically on startup
numScenarios: 5000
numScenarios: 100
#outputPath: /tmp/byzzbench # The path to write the output to
outputSchedules: buggy # which schedules to write to file? one of 'all', 'buggy' or 'none'

scheduler:
id: "random" # The ID of the scheduler to use
id: "byzzfuzz" # The ID of the scheduler to use
executionMode: sync # async (default, any message delivered) or sync (communication-closure hypothesis, FIFO)
maxDropMessages: 0 # Maximum number of messages to drop per scenario
maxMutateMessages: 0 # Maximum number of messages to mutate per scenario
Expand Down

0 comments on commit 33a540a

Please sign in to comment.