-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfrb.cpp
62 lines (54 loc) · 1.97 KB
/
frb.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include "frb.h"
void frb::init() {
lsn = 0;
pending = new std::list<Message>[nb_of_processes];
next = new int[nb_of_processes];
std::fill_n(next, nb_of_processes, 1);
urb_instance.init(this);
}
void frb::frb_broadcast(Message message) {
message.seq_no = ++lsn;
message.sender = my_process_id;
message.initial_sender = my_process_id;
LogMessage lm;
lm.message_type='b';
lm.seq_nr = message.seq_no;
lm.sender = my_process_id;
messages_log[log_pointer] = lm;
log_pointer++;
if(log_pointer == MAX_LOG_FILE)
write_log();
// cout << my_process_id << " FRBBroadcast from this process" << endl;
// broadcast the message using urb
urb_instance.urbBroadcast(message);
}
void frb::frb_deliver(Message message) {
int from = message.initial_sender;
int from_index = from - 1;
// cout << my_process_id << " FRB deliver: received " << message.seq_no << " from " << from << " send throught " << message.sender << endl;
// pen_m.lock();
pending[from_index].push_back(message);
pending[from_index].sort();
// pen_m.unlock();
std::list<Message>::iterator message_iterator = pending[from_index].begin();
while(message_iterator != pending[from_index].end()) {
if(message_iterator -> seq_no <= next[from_index]) {
//This also may have problems with concurrency
// next_m.lock();
if(message_iterator -> seq_no == next[from_index]) next[from_index]++;
// next_m.unlock();
// since FRB is in contact with the application layer then it should also execute the callback received from the application layer
urb_instance.bbb.deliver(*message_iterator);
// erase the message
// pen_m.lock();
message_iterator = pending[from_index].erase(message_iterator);
// pen_m.unlock();
}
else {
++message_iterator;
}
}
}
void frb::deliver(Message message) {
frb_deliver(message);
}