Skip to content

Commit

Permalink
small bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Tezzish committed Jan 24, 2025
1 parent 3d6f207 commit 5d271cf
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void truncateFillHoleMessages(long sequenceNumber) {
}
}

public void putRequestCache(String clientId, RequestMessage rm, SpeculativeResponseWrapper srw) {
public void putResponseCache(String clientId, RequestMessage rm, SpeculativeResponseWrapper srw) {
if (this.highestTimestampInCacheForClient(clientId) > rm.getTimestamp()) {
if (clientId.equals("Noop")) {
return;
Expand All @@ -173,6 +173,10 @@ public void putRequestCache(String clientId, RequestMessage rm, SpeculativeRespo
this.getResponseCache().put(clientId, new ImmutablePair<>(rm, srw));
}

public void putRequestCache(String clientId, RequestMessage rm) {
this.getRequestCache().put(clientId, rm);
}

public void putOrderedRequestMessageWrapper(OrderedRequestMessageWrapper ormw) {
this.getOrderedMessages().put(ormw.getOrderedRequest().getSequenceNumber(), ormw);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ public void sendCommitMessage() {
// create the commit message
CommitMessage commitMessage = getCommitMessage(specResponse, signedBy);
commitMessage.sign(this.getId());
log.info("Sending commit message until response for " + this.getLastRequest().getOperation());
sendCommitUntilResponse(commitMessage);
}

Expand Down Expand Up @@ -227,7 +226,7 @@ private CommitMessage getCommitMessage(SpeculativeResponse specResponse, List<St
private void sendCommitUntilResponse(CommitMessage commitMessage) {
SortedSet<String> recipientIds = (SortedSet<String>) this.getScenario().getReplicas().keySet();
this.getScenario().getTransport().multicast(this, recipientIds, commitMessage);
log.info("Sending commit message to all replicas for " + this.getLastRequest().getOperation());
log.info("Sending commit message until response for " + this.getLastRequest().getOperation());
// Sets the timeout, clears the local commits if the number of local commits is greater than 2f
this.setTimeout("commitTimeout", () -> {
if (this.getLocalCommits().getOrDefault(this.getLastDigest(), new TreeSet<>()).size() >= 2 * this.getNumFaults() + 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,9 @@ public void handleClientRequest(String clientId, Serializable request) {
log.info("Replica " + this.getId() + " ordering request with sequence number " + orm.getSequenceNumber());
orm.sign(this.getId());
OrderedRequestMessageWrapper ormw = new OrderedRequestMessageWrapper(orm, requestMessage);
this.broadcastMessageIncludingSelf(ormw);
// this.handleOrderedRequestMessageWrapper(this.getId(), ormw);
this.broadcastMessage(ormw);
// this.broadcastMessageIncludingSelf(ormw);
this.handleOrderedRequestMessageWrapper(this.getId(), ormw);
} else if (this.getId().equals(this.getLeaderId())) {
log.warning("Retrieving cache as primary");
SpeculativeResponseWrapper srw = this.getMessageLog().getResponseCache().get(clientId).getRight();
Expand Down Expand Up @@ -323,7 +324,10 @@ public void handleOrderedRequestMessageWrapper(String sender, OrderedRequestMess
return;
}
try {
this.clearTimeout(this.getRequestTimeoutId());
if (this.getMessageLog().getRequestCache().containsKey(ormw.getRequestMessage().getClientId()) && this.getMessageLog().getRequestCache().get(ormw.getRequestMessage().getClientId()).equals(ormw.getRequestMessage())) {
log.info("Cleared forward to primary timeout");
this.clearTimeout(this.getForwardToPrimaryTimeoutId());
}
} catch (IllegalArgumentException e) {
log.warning("Failed to clear request timeout, possibly because it's been triggered");
}
Expand All @@ -348,7 +352,8 @@ public void handleOrderedRequestMessageWrapper(String sender, OrderedRequestMess

// check if the ordered request message is valid
// IMPORTANT: as the primary, we don't check if it's valid because it won't allow for byzantine behavior
if (!this.getId().equals(this.getLeaderId()) && !this.isValidOrderedRequestMessage(ormw.getOrderedRequest(), ormw.getRequestMessage())) {
// !this.getId().equals(this.getLeaderId()) &&
if (!this.isValidOrderedRequestMessage(ormw.getOrderedRequest(), ormw.getRequestMessage())) {
return;
}

Expand Down Expand Up @@ -387,7 +392,7 @@ public SpeculativeResponseWrapper executeOrderedRequest(OrderedRequestMessageWra
// updates the ordered messages
this.getMessageLog().getOrderedMessages().put(ormw.getOrderedRequest().getSequenceNumber(), ormw);
// updates the request cache
this.getMessageLog().putRequestCache(clientId, ormw.getRequestMessage(), srw);
this.getMessageLog().putResponseCache(clientId, ormw.getRequestMessage(), srw);

// checkpointing
if (ormw.getOrderedRequest().getSequenceNumber() % this.getCP_INTERVAL() == 0) {
Expand Down Expand Up @@ -894,12 +899,14 @@ public void forwardToPrimary(String clientId, RequestMessage rm) {
this.sendMessage(crm, this.getLeaderId());
log.info("Replica " + this.getId() + " sent a confirm request message to " + this.getLeaderId());

this.getMessageLog().putRequestCache(clientId, rm);

/// TODO: would be nice to include the ADP here but not necessary
this.setForwardToPrimaryTimeoutId(this.setTimeout(
"forwardToPrimary",
() -> {
// we don't check if we've received here because in most cases, we get more recent requests
// and we don't know if we've received it. So we check the property in the ordered request message
// and we don't know if we've received it. So we check the property in the handleOrderedRequestMessage()
log.warning("Failed to forward to primary, init view change");
IHateThePrimaryMessage ihtpm = new IHateThePrimaryMessage(this.getViewNumber());
ihtpm.sign(this.getId());
Expand Down Expand Up @@ -1591,6 +1598,9 @@ private void rollbackToCheckpoint(long latestStableCheckpoint, SortedMap<Long, O
for (OrderedRequestMessageWrapper ormw : calculatedHistory.sequencedValues()) {
this.executeOrderedRequest(ormw);
}
if (maxCC.getSequenceNumber() > this.getHighestSequenceNumber()) {
log.warning("MaxCC sequence number is greater than the highest sequence number");
}
// handles the commit certificate
this.handleCommitCertificate(maxCC);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,11 @@ public void accept(FaultContext serializable) {
OrderedRequestMessageWrapper mutatedMessage = message.withOrderedRequest(mutatedOrm);
messageEvent.setPayload(mutatedMessage);
}
}, new MessageMutationFault("zyzzyva-ordered-request-history-dec", "Decrement History", List.of(OrderedRequestMessageWrapper.class)) {
}, new MessageMutationFault("zyzzyva-ordered-request-digest-random", "Digest random", List.of(OrderedRequestMessageWrapper.class)) {
// Increment the history field of the OrderedRequestMessage, not the next history :(
@Override
public void accept(FaultContext serializable) {

Optional<Event> event = serializable.getEvent();
if (event.isEmpty()) {
throw invalidMessageTypeException;
Expand All @@ -139,16 +140,18 @@ public void accept(FaultContext serializable) {
}

OrderedRequestMessage orm = message.getOrderedRequest();
OrderedRequestMessage mutatedOrm = orm.withHistory(orm.getHistory() - 1);
byte[] mutatedDigest = new byte[orm.getDigest().length];
Random random = new Random();
random.nextBytes(mutatedDigest);
OrderedRequestMessage mutatedOrm = orm.withDigest(mutatedDigest);
mutatedOrm.sign(orm.getSignedBy());
OrderedRequestMessageWrapper mutatedMessage = message.withOrderedRequest(mutatedOrm);
messageEvent.setPayload(mutatedMessage);
}
}, new MessageMutationFault("zyzzyva-ordered-request-digest-random", "Digest random", List.of(OrderedRequestMessageWrapper.class)) {
// Increment the history field of the OrderedRequestMessage, not the next history :(
}, new MessageMutationFault("zyzzyva-ordered-request-first-history", "First History", List.of(OrderedRequestMessageWrapper.class)) {
// Make this request the first one. Necessary for Abraham
@Override
public void accept(FaultContext serializable) {

Optional<Event> event = serializable.getEvent();
if (event.isEmpty()) {
throw invalidMessageTypeException;
Expand All @@ -161,10 +164,7 @@ public void accept(FaultContext serializable) {
}

OrderedRequestMessage orm = message.getOrderedRequest();
byte[] mutatedDigest = new byte[orm.getDigest().length];
Random random = new Random();
random.nextBytes(mutatedDigest);
OrderedRequestMessage mutatedOrm = orm.withDigest(mutatedDigest);
OrderedRequestMessage mutatedOrm = orm.withHistory(Arrays.hashCode(orm.getDigest()));
mutatedOrm.sign(orm.getSignedBy());
OrderedRequestMessageWrapper mutatedMessage = message.withOrderedRequest(mutatedOrm);
messageEvent.setPayload(mutatedMessage);
Expand Down
2 changes: 1 addition & 1 deletion simulator/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ springdoc:
# ByzzBench configuration
byzzbench:
autostart: false # Whether to start running scenarios automatically on startup
numScenarios: 500
numScenarios: 5000
#outputPath: /tmp/byzzbench # The path to write the output to
outputSchedules: buggy # which schedules to write to file? one of 'all', 'buggy' or 'none'

Expand Down

0 comments on commit 5d271cf

Please sign in to comment.