Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ability for P2P to wait for pushlog by peer #1098

Merged
merged 2 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions net/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (s *server) pushLog(ctx context.Context, evt events.Update, pid peer.ID) er
DocKey: &pb.ProtoDocKey{DocKey: dockey},
Cid: &pb.ProtoCid{Cid: evt.Cid},
SchemaID: []byte(evt.SchemaID),
Creator: s.peer.host.ID().String(),
Log: &pb.Document_Log{
Block: evt.Block.RawData(),
},
Expand Down
207 changes: 110 additions & 97 deletions net/pb/net.pb.go

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions net/pb/net.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ message PushLogRequest {
Body body = 1;

message Body {
// docKey is the target DocKey.
// docKey is the DocKey of the document that is affected by the log.
bytes docKey = 1 [(gogoproto.customtype) = "ProtoDocKey"];
// cid is the target CID.
// cid is the CID of the composite of the document.
bytes cid = 2 [(gogoproto.customtype) = "ProtoCid"];
//
// schemaID is the SchemaID of the collection that the document resides in.
bytes schemaID = 3;
// record is the actual record payload.
Document.Log log = 4;
// creator is the peer ID of the peer that created the log.
string creator = 4;
// log hold the block that represent version of the document.
Document.Log log = 5;
}
}

Expand Down
5 changes: 4 additions & 1 deletion net/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func (p *Peer) RegisterNewDocument(
DocKey: &pb.ProtoDocKey{DocKey: dockey},
Cid: &pb.ProtoCid{Cid: c},
SchemaID: []byte(schemaID),
Creator: p.host.ID().String(),
Copy link
Contributor

@AndrewSisley AndrewSisley Feb 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: I do worry a bit that we'll end up running into some kind of privacy/security issue by either broadcasting or relying-on fixed host ids like this, a problem for another day (if ever) though I think

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ID was already used in the P2P exchange so I'm not sure if adding it here make a huge difference? You think it does?

John was also saying we'll need the creator ID at some point in the future anyways for state-proofs if I recall correctly.

Copy link
Contributor

@AndrewSisley AndrewSisley Feb 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ID was already used in the P2P exchange so I'm not sure if adding it here make a huge difference? You think it does?

I really don't know. Just feels odd to see a trackable, fixed, actor identifier in a (privacy focused?) decentralised system. I also no idea how avoidable such a thing is. But it might be good to avoid spreading its use where we can do so easily (not saying it is easy here, I have no idea).

Log: &pb.Document_Log{
Block: nd.RawData(),
},
Expand Down Expand Up @@ -602,6 +603,7 @@ func (p *Peer) handleDocUpdateLog(evt events.Update) error {
DocKey: &pb.ProtoDocKey{DocKey: dockey},
Cid: &pb.ProtoCid{Cid: evt.Cid},
SchemaID: []byte(evt.SchemaID),
Creator: p.host.ID().String(),
Log: &pb.Document_Log{
Block: evt.Block.RawData(),
},
Expand Down Expand Up @@ -697,7 +699,8 @@ func stopGRPCServer(ctx context.Context, server *grpc.Server) {
}

type EvtReceivedPushLog struct {
Peer peer.ID
ByPeer peer.ID
FromPeer peer.ID
}

type EvtPubSub struct {
Expand Down
7 changes: 6 additions & 1 deletion net/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,13 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL
}

if s.pushLogEmitter != nil {
byPeer, err := libpeer.Decode(req.Body.Creator)
if err != nil {
log.Info(ctx, "could not decode the peer id of the log creator", logging.NewKV("Error", err.Error()))
}
err = s.pushLogEmitter.Emit(EvtReceivedPushLog{
Peer: pid,
FromPeer: pid,
ByPeer: byPeer,
})
if err != nil {
// logging instead of returning an error because the event bus should
Expand Down
31 changes: 27 additions & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,16 +311,39 @@ func (n *Node) WaitForPubSubEvent(id peer.ID) error {
}
}

// WaitForPushLogEvent listens to the event channel for a push log event from a given peer.
// WaitForPushLogByPeerEvent listens to the event channel for a push log event by a given peer.
//
// By refers to the log creator. It can be different than the log sender.
//
// It will block the calling thread until an event is yielded to an internal channel. This
// event is not nessecarily the next event and is dependent on the number of concurrent callers
// event is not necessarily the next event and is dependent on the number of concurrent callers
// (each event will only notify a single caller, not all of them).
func (n *Node) WaitForPushLogEvent(id peer.ID) error {
func (n *Node) WaitForPushLogByPeerEvent(id peer.ID) error {
for {
select {
case evt := <-n.pushLogEvent:
if evt.Peer != id {
if evt.ByPeer != id {
continue
}
return nil
case <-time.After(evtWaitTimeout):
return errors.New("waiting for pushlog timed out")
}
}
}

// WaitForPushLogFromPeerEvent listens to the event channel for a push log event from a given peer.
//
// From refers to the log sender. It can be different that the log creator.
//
// It will block the calling thread until an event is yielded to an internal channel. This
// event is not necessarily the next event and is dependent on the number of concurrent callers
// (each event will only notify a single caller, not all of them).
func (n *Node) WaitForPushLogFromPeerEvent(id peer.ID) error {
for {
select {
case evt := <-n.pushLogEvent:
if evt.FromPeer != id {
continue
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/net/order/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func executeTestCase(t *testing.T, test P2PTestCase) {
continue
}
log.Info(ctx, fmt.Sprintf("Waiting for node %d to sync with peer %d", n2, n))
err := p.WaitForPushLogEvent(nodes[n].PeerID())
err := p.WaitForPushLogByPeerEvent(nodes[n].PeerID())
require.NoError(t, err)
log.Info(ctx, fmt.Sprintf("Node %d synced", n2))
}
Expand Down Expand Up @@ -321,7 +321,7 @@ func executeTestCase(t *testing.T, test P2PTestCase) {
}
for _, rep := range reps {
log.Info(ctx, fmt.Sprintf("Waiting for node %d to sync with peer %d", rep, n))
err := nodes[rep].WaitForPushLogEvent(nodes[n].PeerID())
err := nodes[rep].WaitForPushLogByPeerEvent(nodes[n].PeerID())
require.NoError(t, err)
log.Info(ctx, fmt.Sprintf("Node %d synced", rep))

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/net/state/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func waitForNodesToSync(
wg *sync.WaitGroup,
) {
log.Info(ctx, fmt.Sprintf("Waiting for node %d to sync with peer %d", targetIndex, sourceIndex))
err := nodes[targetIndex].WaitForPushLogEvent(nodes[sourceIndex].PeerID())
err := nodes[targetIndex].WaitForPushLogByPeerEvent(nodes[sourceIndex].PeerID())
// This must be an assert and not a require, a panic here will block the test as
// the wait group will never complete.
assert.NoError(t, err)
Expand Down