Skip to content

Commit

Permalink
fix build
Browse files Browse the repository at this point in the history
  • Loading branch information
joaomlneto committed Mar 3, 2025
1 parent f333b33 commit 107ce5b
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 153 deletions.
40 changes: 20 additions & 20 deletions simulator/src/main/java/byzzbench/simulator/HbftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,39 +71,39 @@ public class HbftClient extends Client {
*/
@Override
public void sendRequest() {
String requestId = String.format("%s/%d", super.id, super.requestSequenceNumber.incrementAndGet());
String requestId = String.format("%s/%d", getId(), getRequestSequenceNumber().incrementAndGet());
long timestamp = this.getCurrentTime().toEpochMilli();
RequestMessage request = new RequestMessage(requestId, timestamp, super.id);
this.sentRequests.put(super.requestSequenceNumber.get(), request);
RequestMessage request = new RequestMessage(requestId, timestamp, getId());
this.sentRequests.put(getRequestSequenceNumber().get(), request);
this.sentRequestsByTimestamp.put(timestamp, requestId);
this.broadcastRequest(timestamp, requestId);

// Set timeout
Long timeoutId = this.setTimeout("REQUEST", this::retransmitOrPanic, this.timeout);
timeouts.put(super.requestSequenceNumber.get(), timeoutId);
timeouts.put(getRequestSequenceNumber().get(), timeoutId);
}

public void retransmitOrPanic() {
long tolerance = (long) Math.floor((super.scenario.getTransport().getNodeIds().size() - 1) / 3);
long tolerance = (long) Math.floor((getScenario().getTransport().getNodeIds().size() - 1) / 3);
if (this.shouldRetransmit(tolerance)) {
String requestId = String.format("%s/%d", super.id, super.requestSequenceNumber.get());
String requestId = String.format("%s/%d", getId(), getRequestSequenceNumber().get());
// Based on hBFT 4.1 it uses the identical request
// TODO: It probably should not be the same timestamp
long timestamp = this.sentRequests.get(super.requestSequenceNumber.get()).getTimestamp();
long timestamp = this.sentRequests.get(getRequestSequenceNumber().get()).getTimestamp();
this.broadcastRequest(timestamp, requestId);
} else if (this.shouldPanic(tolerance)) {
RequestMessage message = this.sentRequests.get(super.requestSequenceNumber.get());
PanicMessage panic = new PanicMessage(this.digest(message), this.getCurrentTime().toEpochMilli(), super.id);
super.scenario.getTransport().multicast(this, super.scenario.getTransport().getNodeIds(), panic);
RequestMessage message = this.sentRequests.get(getRequestSequenceNumber().get());
PanicMessage panic = new PanicMessage(this.digest(message), this.getCurrentTime().toEpochMilli(), getId());
getScenario().getTransport().multicast(this, getScenario().getTransport().getNodeIds(), panic);
}
this.clearTimeout(timeouts.get(super.requestSequenceNumber.get()));
this.clearTimeout(timeouts.get(getRequestSequenceNumber().get()));
Long timeoutId = this.setTimeout("REQUEST", this::retransmitOrPanic, this.timeout);
timeouts.put(super.requestSequenceNumber.get(), timeoutId);
timeouts.put(getRequestSequenceNumber().get(), timeoutId);
}

private void broadcastRequest(long timestamp, String requestId) {
MessagePayload payload = new ClientRequestMessage(timestamp, requestId);
SortedSet<String> replicaIds = super.scenario.getTransport().getNodeIds();
SortedSet<String> replicaIds = getScenario().getTransport().getNodeIds();
getScenario().getTransport().multicast(this, replicaIds, payload);
}

Expand Down Expand Up @@ -131,9 +131,9 @@ public void handleMessage(String senderId, MessagePayload payload) {
*/
if (this.completedReplies(clientReplyMessage.getTolerance())
&& !this.completedRequests.contains(key)
&& super.requestSequenceNumber.get() <= this.maxRequests) {
&& getRequestSequenceNumber().get() <= getMaxRequests()) {
this.completedRequests.add(key);
this.clearTimeout(this.timeouts.get(super.requestSequenceNumber.get()));
this.clearTimeout(this.timeouts.get(getRequestSequenceNumber().get()));
this.sendRequest();
}
}
Expand All @@ -148,15 +148,15 @@ public void handleMessage(String senderId, MessagePayload payload) {
*/
public long setTimeout(String name, Runnable r, long timeout) {
Duration duration = Duration.ofSeconds(timeout);
return super.scenario.getTransport().setTimeout(this, r, duration, name);
return getScenario().getTransport().setTimeout(this, r, duration, name);
}

/**
* Checks whether client should retransmit the request
* if #replies < f + 1
*/
public boolean shouldRetransmit(long tolerance) {
String currRequest = String.format("%s/%d", super.id, super.requestSequenceNumber.get());
String currRequest = String.format("%s/%d", getId(), getRequestSequenceNumber().get());
if (!hbftreplies.containsKey(currRequest)) {
return true;
}
Expand All @@ -171,7 +171,7 @@ public boolean shouldRetransmit(long tolerance) {
* if f + 1 <= #replies < 2f + 1
*/
public boolean shouldPanic(long tolerance) {
String currRequest = String.format("%s/%d", super.id, super.requestSequenceNumber.get());
String currRequest = String.format("%s/%d", getId(), getRequestSequenceNumber().get());
for (ClientReplyKey key : hbftreplies.get(currRequest).keySet()) {
return this.hbftreplies.get(currRequest).get(key).size() >= tolerance + 1
&& this.hbftreplies.get(currRequest).get(key).size() < tolerance * 2 + 1;
Expand All @@ -183,7 +183,7 @@ public boolean shouldPanic(long tolerance) {
* Checks whether it has received 2f + 1 replies
*/
public boolean completedReplies(long tolerance) {
String currRequest = String.format("%s/%d", super.id, super.requestSequenceNumber.get());
String currRequest = String.format("%s/%d", getId(), getRequestSequenceNumber().get());
if (!hbftreplies.containsKey(currRequest)) {
return false;
}
Expand All @@ -199,7 +199,7 @@ public boolean completedReplies(long tolerance) {
* Clear all timeouts for this client.
*/
// public void clearAllTimeouts() {
// super.scenario.getTransport().clearClientTimeouts(super.id);
// getScenario.getTransport().clearClientTimeouts(getId());
// }

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void sendRequest(String senderId) {
String command = String.format("%s/%d", this.getId(), sequenceNumber);
// TODO: compute the digest
RequestMessage request = new RequestMessage(this.getId(), sequenceNumber, "-1", command);
this.getScenario().getTransport().sendClientRequest(this.getId(), request, senderId);
this.getScenario().getTransport().sendMessage(this, request, senderId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,41 @@
@Log
@Getter
public class FastByzantineReplica extends LeaderBasedProtocolReplica {
// The number of replicas in the system with each role.
private final int p, a, l, f;
// Timeout until the proposer replica starts suspecting the leader for the lack of progress.
private final long messageTimeoutDuration;
@JsonIgnore
private final Transport transport;
// Current client ID that the system is communicating with.
private final String clientId;
/**
* The log of received messages for the replica.
*/
@JsonIgnore
private final MessageLog messageLog;
// In the "Fast Byzantine Consensus" protocol, a replica can have one or more roles.
// The possible roles are : PROPOSER, ACCEPTOR, LEARNER.
// The required number of replicas with each role is:
// p ( proposers ) = 3 * f + 1, a ( acceptors ) = 5 * f + 1, l ( learners ) = 3 * f + 1.
private List<Role> roles = new ArrayList<>();

// The number of replicas in the system with each role.
private final int p, a, l, f;
// The message round number.
private long viewNumber;
private long proposalNumber;
// The value that the leader replica is proposing - changes each round, depending on the client request.
private byte[] proposedValue;
// The progress certificate for the current view.
private ProgressCertificate pc;
// Timeout until the proposer replica starts suspecting the leader for the lack of progress.
private final long messageTimeoutDuration;
// Current leader.
@Setter
private String leaderId;
@Setter
private boolean isCurrentlyLeader;

// The set of node IDs in the system for each role.
private SortedSet<String> acceptorNodeIds = new TreeSet<>();
private SortedSet<String> learnerNodeIds = new TreeSet<>();
private SortedSet<String> proposerNodeIds = new TreeSet<>();;
private SortedSet<String> nodeIds = new TreeSet<>();;

@JsonIgnore
private final Transport transport;

// Current client ID that the system is communicating with.
private String clientId;

private SortedSet<String> proposerNodeIds = new TreeSet<>();
private SortedSet<String> nodeIds = new TreeSet<>();
private boolean isSatisfied;
// Keep track of the timeouts set by the replica.
private long learnerTimeoutId = -1;
Expand All @@ -67,11 +67,6 @@ public class FastByzantineReplica extends LeaderBasedProtocolReplica {
@Setter
private boolean isRecovered;
private int forwards;
/**
* The log of received messages for the replica.
*/
@JsonIgnore
private MessageLog messageLog;
private boolean committed;
private byte[] operation;
private long viewChangeRequests = 0;
Expand Down Expand Up @@ -188,15 +183,16 @@ private void leaderOnStart(ProposeMessage proposeMessage) {
private void proposerOnStart() {
int learnedThreshold = (int) Math.ceil((l + f + 1) / 2.0);

this.proposerTimeoutId = this.setTimeout(
"proposer-timeout",
() -> {
if (!messageLog.proposerHasLearned(learnedThreshold)) {
log.warning("Leader suspected by proposer " + getId());
broadcastMessageIncludingSelf(new SuspectMessage(getId(), this.leaderId, this.viewNumber));
}
}, Duration.ofMillis(messageTimeoutDuration));
this.proposerTimeoutId = this.setTimeout(
"proposer-timeout",
() -> {
if (!messageLog.proposerHasLearned(learnedThreshold)) {
log.warning("Leader suspected by proposer " + getId());
broadcastMessageIncludingSelf(new SuspectMessage(getId(), this.leaderId, this.viewNumber));
}
}, Duration.ofMillis(messageTimeoutDuration));
}

/**
* Learner replica starts by awaiting to learn a value.
* If it has not learned any value after waiting, send a PULL message to all other LEARNER replicas.
Expand All @@ -218,10 +214,10 @@ private void learnerOnStart() {

/**
* Handle a client request received by the replica.
*
* @param clientId the ID of the client
* @param request the request payload
*/
@Override
public void handleClientRequest(String clientId, Serializable request) throws UnsupportedOperationException {
// If the client request is not send to the leader, forward it to the leader.
if (!isLeader()) {
Expand Down Expand Up @@ -302,7 +298,8 @@ private void handleForwardClientRequest(String sender, ForwardClientRequest forw

/**
* Handle a PROPOSE message send by a replica with Proposer role, who is the leader, received by all Acceptor replicas.
* @param sender : the nodeId of the sender (the current leader)
*
* @param sender : the nodeId of the sender (the current leader)
* @param proposeMessage : the PROPOSE message with the proposed value and round number
*/
private void handleProposeMessage(String sender, ProposeMessage proposeMessage) {
Expand All @@ -320,7 +317,8 @@ private void handleProposeMessage(String sender, ProposeMessage proposeMessage)

/**
* Handle an ACCEPT message sent by an Acceptor replica, received by a Learner replica.
* @param sender : the nodeId of the sender (an Acceptor replica)
*
* @param sender : the nodeId of the sender (an Acceptor replica)
* @param acceptMessage : the ACCEPT message with the value and proposal number
*/
private void handleAcceptMessage(String sender, AcceptMessage acceptMessage) {
Expand Down Expand Up @@ -354,7 +352,8 @@ private void handleAcceptMessage(String sender, AcceptMessage acceptMessage) {

/**
* Handle a LEARN message sent by a Proposer replica, received by a Proposer replica.
* @param sender : the nodeId of the sender (a Learner replica)
*
* @param sender : the nodeId of the sender (a Learner replica)
* @param learnMessage : the LEARN message with the value and proposal number
*/
private void handleLearnMessageProposer(String sender, LearnMessage learnMessage) {
Expand Down Expand Up @@ -390,7 +389,8 @@ private void handleLearnMessageProposer(String sender, LearnMessage learnMessage

/**
* Handle a LEARN message sent by a Learner replica, received by a Learner replica.
* @param sender : the nodeId of the sender (a Learner replica)
*
* @param sender : the nodeId of the sender (a Learner replica)
* @param learnMessage : the LEARN message with the value and proposal number
*/
private void handleLearnMessageLearner(String sender, LearnMessage learnMessage) {
Expand Down Expand Up @@ -428,7 +428,8 @@ private void handleLearnMessageLearner(String sender, LearnMessage learnMessage)

/**
* Handle a SATISFIED message received by a Proposer replica.
* @param sender : the nodeId of the sender (a Proposer replica)
*
* @param sender : the nodeId of the sender (a Proposer replica)
* @param satisfiedMessage : the SATISFIED message with the value and proposal number
*/
private void handleSatisfiedMessage(String sender, SatisfiedMessage satisfiedMessage) {
Expand Down Expand Up @@ -466,7 +467,8 @@ private void handleSatisfiedMessage(String sender, SatisfiedMessage satisfiedMes
/**
* Handle a QUERY message received by an Acceptor replica.
* This message is sent after a new leader has been elected.
* @param sender : the nodeId of the sender (the new leader)
*
* @param sender : the nodeId of the sender (the new leader)
* @param queryMessage : the QUERY message with the view number and progress certificate
*/
private void handleQueryMessage(String sender, QueryMessage queryMessage) {
Expand Down Expand Up @@ -494,7 +496,8 @@ private void handleQueryMessage(String sender, QueryMessage queryMessage) {

/**
* Handle a PULL message received by a Learner replica. Should send the learned value, if any.
* @param sender : the nodeId of the sender (a Learner replica)
*
* @param sender : the nodeId of the sender (a Learner replica)
* @param pullMessage : the PULL message
*/
private void handlePullMessage(String sender, PullMessage pullMessage) {
Expand All @@ -515,7 +518,8 @@ private void handlePullMessage(String sender, PullMessage pullMessage) {

/**
* Handle a SUSPECT message received by a replica.
* @param sender : the nodeId of the sender (a replica that suspects the leader)
*
* @param sender : the nodeId of the sender (a replica that suspects the leader)
* @param suspectMessage : the SUSPECT message with the nodeId of the suspected leader
*/
private void handleSuspectMessage(String sender, SuspectMessage suspectMessage) {
Expand All @@ -535,7 +539,8 @@ private void handleSuspectMessage(String sender, SuspectMessage suspectMessage)

/**
* Handle a REPLY message received by the leader replica.
* @param sender : the nodeId of the sender (an Acceptor replica which was queried)
*
* @param sender : the nodeId of the sender (an Acceptor replica which was queried)
* @param replyMessage : the REPLY message with the value and round number of the previous round
*/
public void handleReplyMessage(String sender, ReplyMessage replyMessage) {
Expand Down Expand Up @@ -579,7 +584,8 @@ public void handleReplyMessage(String sender, ReplyMessage replyMessage) {
* Handle a VIEW_CHANGE message received by a replica.
* This is used to update the view number and leader ID sent to all the other replicas
* after a replica elected a new leader.
* @param sender : the nodeId of the sender (the replica that elected a new leader)
*
* @param sender : the nodeId of the sender (the replica that elected a new leader)
* @param viewChangeMessage : the VIEW_CHANGE message with the new view number and leader ID
*/
private void handleViewChangeMessage(String sender, ViewChangeMessage viewChangeMessage) {
Expand Down Expand Up @@ -695,9 +701,10 @@ private String getNewLeader() {

/**
* Move replica to the next view number and reset the replica.
*
* @param newViewNumber : the new view number
* @param sender : the nodeId of the sender that triggered the reset
* @param message : the message that caused the reset
* @param sender : the nodeId of the sender that triggered the reset
* @param message : the message that caused the reset
*/
public void reset(long newViewNumber, String sender, MessagePayload message) {
// Since we are resetting the replica, previous timeouts should be cleared
Expand All @@ -715,7 +722,9 @@ public void reset(long newViewNumber, String sender, MessagePayload message) {
onStart();
}

/** Methods for checking the roles of the replica. **/
/**
* Methods for checking the roles of the replica.
**/
private boolean isAcceptor() {
return this.roles.contains(Role.ACCEPTOR);
}
Expand Down
Loading

0 comments on commit 107ce5b

Please sign in to comment.