Skip to content

Commit

Permalink
move deduplication logic to the holepunch service
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Sep 7, 2021
1 parent 2605890 commit 54861f8
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 30 deletions.
75 changes: 45 additions & 30 deletions p2p/protocol/holepunch/coordination.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,14 @@ const (

var (
log = logging.Logger("p2p-holepunch")
// ErrHolePunchActive is returned from DirectConnect when another hole punching attempt is currently running
ErrHolePunchActive = errors.New("another hole punching attempt to this peer is active")
// ErrClosed is returned when the hole punching is closed
ErrClosed = errors.New("hole punching service closing")
)

var ReplaceHosts = func(s string) string { return s }

// The Service is used to make direct connections with a peer via hole-punching.
type Service struct {
ctx context.Context
Expand Down Expand Up @@ -149,10 +155,42 @@ func (hs *Service) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, time.Duration,
return addrs, rtt, nil
}

func (hs *Service) beginDirectConnect(p peer.ID) error {
hs.closeMx.RLock()
defer hs.closeMx.RUnlock()
if hs.closed {
return ErrClosed
}

hs.activeMx.Lock()
defer hs.activeMx.Unlock()
if _, ok := hs.active[p]; ok {
return ErrHolePunchActive
}

hs.active[p] = struct{}{}
return nil
}

// DirectConnect attempts to make a direct connection with a remote peer.
// It first attempts a direct dial (if we have a public address of that peer), and then
// coordinates a hole punch over the given relay connection.
func (hs *Service) DirectConnect(rp peer.ID) error {
func (hs *Service) DirectConnect(p peer.ID) error {
log.Debugw("got inbound proxy conn from peer", p)
if err := hs.beginDirectConnect(p); err != nil {
return err
}

defer func() {
hs.activeMx.Lock()
delete(hs.active, p)
hs.activeMx.Unlock()
}()

return hs.directConnect(p)
}

func (hs *Service) directConnect(rp peer.ID) error {
// short-circuit check to see if we already have a direct connection
for _, c := range hs.host.Network().ConnsToPeer(rp) {
if !isRelayAddress(c.RemoteMultiaddr()) {
Expand Down Expand Up @@ -339,48 +377,25 @@ func addrsFromBytes(bzs [][]byte) []ma.Multiaddr {

type netNotifiee Service

func (nn *netNotifiee) Connected(_ network.Network, v network.Conn) {
func (nn *netNotifiee) Connected(_ network.Network, conn network.Conn) {
hs := (*Service)(nn)
dir := v.Stat().Direction

// Hole punch if it's an inbound proxy connection.
// If we already have a direct connection with the remote peer, this will be a no-op.
if dir == network.DirInbound && isRelayAddress(v.RemoteMultiaddr()) {
p := v.RemotePeer()
hs.activeMx.Lock()
hs.closeMx.RLock()
closed := hs.closed
_, active := hs.active[p]
if !active && !closed {
hs.refCount.Add(1)
hs.active[p] = struct{}{}
}
hs.closeMx.RUnlock()
hs.activeMx.Unlock()

if active || closed {
return
}

log.Debugw("got inbound proxy conn from peer", v.RemotePeer())
if conn.Stat().Direction == network.DirInbound && isRelayAddress(conn.RemoteMultiaddr()) {
hs.refCount.Add(1)
go func() {
defer hs.refCount.Done()
defer func() {
hs.activeMx.Lock()
delete(hs.active, p)
hs.activeMx.Unlock()
}()

select {
// waiting for Identify here will allow us to access the peer's public and observed addresses
// that we can dial to for a hole punch.
case <-hs.ids.IdentifyWait(v):
case <-hs.ids.IdentifyWait(conn):
case <-hs.ctx.Done():
return
}

_ = hs.DirectConnect(v.RemotePeer())
_ = hs.DirectConnect(conn.RemotePeer())
}()
return
}
}

Expand Down
7 changes: 7 additions & 0 deletions p2p/protocol/holepunch/coordination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package holepunch_test
import (
"context"
"net"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -328,6 +329,12 @@ func makeRelayedHosts(t *testing.T, h1Opt holepunch.Option) (h1, h2, relay host.
require.NoError(t, err)
h2 = mkHostWithStaticAutoRelay(t, context.Background(), relay)
hps = addHolePunchService(t, h2)
holepunch.ReplaceHosts = func(s string) string {
str := strings.ReplaceAll(s, h1.ID().Pretty(), "h1")
str = strings.ReplaceAll(str, h2.ID().Pretty(), "h2")
str = strings.ReplaceAll(str, relay.ID().Pretty(), "relay")
return str
}

// h1 has a relay addr
// h2 should connect to the relay addr
Expand Down

0 comments on commit 54861f8

Please sign in to comment.