Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: enable p2p http txsync #5922

Merged
merged 6 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions components/mocks/mockNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package mocks

import (
"context"
"errors"
"net"
"net/http"

Expand Down Expand Up @@ -76,11 +77,6 @@ func (network *MockNetwork) GetPeers(options ...network.PeerOption) []network.Pe
return nil
}

// GetRoundTripper -- returns the network round tripper
func (network *MockNetwork) GetRoundTripper(peer network.Peer) http.RoundTripper {
return http.DefaultTransport
}

// Ready - always ready
func (network *MockNetwork) Ready() chan struct{} {
c := make(chan struct{})
Expand Down Expand Up @@ -115,3 +111,8 @@ func (network *MockNetwork) GetGenesisID() string {
}
return network.GenesisID
}

// GetHTTPClient returns a http.Client with a suitable for the network
func (network *MockNetwork) GetHTTPClient(p network.HTTPPeer) (*http.Client, error) {
return nil, errors.New("not implemented")
}
2 changes: 2 additions & 0 deletions network/addr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,12 @@ func TestParseHostOrURL(t *testing.T) {
t.Run(addr, func(t *testing.T) {
_, err := ParseHostOrURL(addr)
require.Error(t, err, "url should fail", addr)
require.False(t, IsMultiaddr(addr))
})
t.Run(addr+"-multiaddr", func(t *testing.T) {
_, err := ParseHostOrURLOrMultiaddr(addr)
require.Error(t, err, "url should fail", addr)
require.False(t, IsMultiaddr(addr))
})
}

Expand Down
5 changes: 3 additions & 2 deletions network/gossipNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ type GossipNode interface {
// ClearHandlers deregisters all the existing message handlers.
ClearHandlers()

// GetRoundTripper returns a Transport that would limit the number of outgoing connections.
GetRoundTripper(peer Peer) http.RoundTripper
// GetHTTPClient returns a http.Client with a suitable for the network Transport
// that would also limit the number of outgoing connections.
GetHTTPClient(peer HTTPPeer) (*http.Client, error)

// OnNetworkAdvance notifies the network library that the agreement protocol was able to make a notable progress.
// this is the only indication that we have that we haven't formed a clique, where all incoming messages
Expand Down
14 changes: 7 additions & 7 deletions network/hybridNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,14 @@
n.wsNetwork.ClearHandlers()
}

// GetRoundTripper returns a Transport that would limit the number of outgoing connections.
func (n *HybridP2PNetwork) GetRoundTripper(peer Peer) http.RoundTripper {
// TODO today this is used by HTTPTxSync.Sync after calling GetPeers(network.PeersPhonebookRelays)
switch p := peer.(type) {
// GetHTTPClient returns a http.Client with a suitable for the network Transport
// that would also limit the number of outgoing connections.
func (n *HybridP2PNetwork) GetHTTPClient(peer HTTPPeer) (*http.Client, error) {
switch peer.(type) {

Check warning on line 186 in network/hybridNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/hybridNetwork.go#L185-L186

Added lines #L185 - L186 were not covered by tests
case *wsPeer:
return p.net.GetRoundTripper(peer)
case gossipSubPeer:
return p.net.GetRoundTripper(peer)
return n.wsNetwork.GetHTTPClient(peer)
case *wsPeerCore:
return n.p2pNetwork.GetHTTPClient(peer)

Check warning on line 190 in network/hybridNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/hybridNetwork.go#L188-L190

Added lines #L188 - L190 were not covered by tests
default:
panic("unrecognized peer type")
}
Expand Down
4 changes: 2 additions & 2 deletions network/p2p/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func setupDHTHosts(t *testing.T, numHosts int) []*dht.IpfsDHT {
require.NoError(t, err)
// this is a workaround for the following issue
// "failed to negotiate security protocol: error reading handshake message: noise: message is too short"
// it appears simultenous connectino attempts (dht.New() attempts to connect) causes this handshake error.
// it appears simultaneous connection attempts (dht.New() attempts to connect) causes this handshake error.
// https://github.com/libp2p/go-libp2p-noise/issues/70
time.Sleep(200 * time.Millisecond)

Expand Down Expand Up @@ -158,7 +158,7 @@ func setupCapDiscovery(t *testing.T, numHosts int, numBootstrapPeers int) []*Cap
require.NoError(t, err)
// this is a workaround for the following issue
// "failed to negotiate security protocol: error reading handshake message: noise: message is too short"
// it appears simultenous connectino attempts (dht.New() attempts to connect) causes this handshake error.
// it appears simultaneous connection attempts (dht.New() attempts to connect) causes this handshake error.
// https://github.com/libp2p/go-libp2p-noise/issues/70
time.Sleep(200 * time.Millisecond)

Expand Down
42 changes: 36 additions & 6 deletions network/p2p/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,47 @@

import (
"net/http"
"sync"

"github.com/gorilla/mux"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
)

// MakeHTTPClient creates a http.Client that uses libp2p transport for a goven protocol and peer address.
func MakeHTTPClient(protocolID string, addrInfo peer.AddrInfo) (http.Client, error) {
// algorandP2pHTTPProtocol defines a libp2p protocol name for algorand's http over p2p messages
const algorandP2pHTTPProtocol = "/algorand-http/1.0.0"

// HTTPServer is a wrapper around libp2phttp.Host that allows registering http handlers with path parameters.
type HTTPServer struct {
libp2phttp.Host
p2phttpMux *mux.Router
p2phttpMuxRegistrarOnce sync.Once
}

// MakeHTTPServer creates a new HTTPServer
func MakeHTTPServer(streamHost host.Host) *HTTPServer {
httpServer := HTTPServer{
Host: libp2phttp.Host{StreamHost: streamHost},
p2phttpMux: mux.NewRouter(),

Check warning on line 44 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L41-L44

Added lines #L41 - L44 were not covered by tests
}
return &httpServer

Check warning on line 46 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L46

Added line #L46 was not covered by tests
}

// RegisterHTTPHandler registers a http handler with a given path.
func (s *HTTPServer) RegisterHTTPHandler(path string, handler http.Handler) {
s.p2phttpMux.Handle(path, handler)
s.p2phttpMuxRegistrarOnce.Do(func() {
s.Host.SetHTTPHandlerAtPath(algorandP2pHTTPProtocol, "/", s.p2phttpMux)
})

Check warning on line 54 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L50-L54

Added lines #L50 - L54 were not covered by tests
}

// MakeHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address.
func MakeHTTPClient(addrInfo *peer.AddrInfo) (*http.Client, error) {

Check warning on line 58 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L58

Added line #L58 was not covered by tests
clientStreamHost, err := libp2p.New(libp2p.NoListenAddrs)
if err != nil {
return http.Client{}, err
return nil, err

Check warning on line 61 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L61

Added line #L61 was not covered by tests
}

client := libp2phttp.Host{StreamHost: clientStreamHost}
Expand All @@ -37,10 +67,10 @@
// to make a NamespaceRoundTripper that limits to specific URL paths.
// First, we do not want make requests when listing peers (the main MakeHTTPClient invoker).
// Secondly, this makes unit testing easier - no need to register fake handlers.
rt, err := client.NewConstrainedRoundTripper(addrInfo)
rt, err := client.NewConstrainedRoundTripper(*addrInfo)

Check warning on line 70 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L70

Added line #L70 was not covered by tests
if err != nil {
return http.Client{}, err
return nil, err

Check warning on line 72 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L72

Added line #L72 was not covered by tests
}

return http.Client{Transport: rt}, nil
return &http.Client{Transport: rt}, nil

Check warning on line 75 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L75

Added line #L75 was not covered by tests
}
3 changes: 0 additions & 3 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ type serviceImpl struct {
// AlgorandWsProtocol defines a libp2p protocol name for algorand's websockets messages
const AlgorandWsProtocol = "/algorand-ws/1.0.0"

// AlgorandP2pHTTPProtocol defines a libp2p protocol name for algorand's http over p2p messages
const AlgorandP2pHTTPProtocol = "/algorand-http/1.0.0"

const dialTimeout = 30 * time.Second

// MakeHost creates a libp2p host but does not start listening.
Expand Down
6 changes: 3 additions & 3 deletions network/p2p/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
if stream.Stat().Direction == network.DirUnknown {
n.log.Warnf("Unknown direction for a steam %s to/from %s", stream.ID(), remotePeer)
} else {
n.log.Warnf("Unexpected outgoing sream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String())
n.log.Warnf("Unexpected outgoing stream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String())

Check warning on line 83 in network/p2p/streams.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/streams.go#L83

Added line #L83 was not covered by tests
}
}
n.handler(n.ctx, remotePeer, stream, incoming)
Expand All @@ -98,7 +98,7 @@
if stream.Stat().Direction == network.DirUnknown {
n.log.Warnf("streamHandler: unknown direction for a steam %s to/from %s", stream.ID(), remotePeer)
} else {
n.log.Warnf("Unexpected outgoing sream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String())
n.log.Warnf("Unexpected outgoing stream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String())

Check warning on line 101 in network/p2p/streams.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/streams.go#L101

Added line #L101 was not covered by tests
}
}
n.handler(n.ctx, remotePeer, stream, incoming)
Expand Down Expand Up @@ -141,7 +141,7 @@
// a new stream created above, expected direction is outbound
incoming := stream.Stat().Direction == network.DirInbound
if incoming {
n.log.Warnf("Unexpected incoming sream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String())
n.log.Warnf("Unexpected incoming stream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String())

Check warning on line 144 in network/p2p/streams.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/streams.go#L144

Added line #L144 was not covered by tests
} else {
if stream.Stat().Direction == network.DirUnknown {
n.log.Warnf("Connected: unknown direction for a steam %s to/from %s", stream.ID(), remotePeer)
Expand Down
116 changes: 116 additions & 0 deletions network/p2p/testing/httpNode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

// This package wraps and re-exports the libp2p functions on order to keep
// all go-libp2p imports in one place.

package p2p

import (
"net/http"
"testing"

"github.com/algorand/go-algorand/components/mocks"
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/network/p2p"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
)

// HTTPNode is a mock network node that uses libp2p and http.
type HTTPNode struct {
mocks.MockNetwork
host.Host
httpServer *p2p.HTTPServer
peers []network.Peer
tb testing.TB
genesisID string
}

// MakeHTTPNode returns a new P2PHTTPNode node.
func MakeHTTPNode(tb testing.TB) *HTTPNode {
p2pHost, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
require.NoError(tb, err)

return &HTTPNode{
Host: p2pHost,
httpServer: p2p.MakeHTTPServer(p2pHost),
tb: tb,
}
}

// RegisterHTTPHandler registers a http handler with a given path.
func (p *HTTPNode) RegisterHTTPHandler(path string, handler http.Handler) {
p.httpServer.RegisterHTTPHandler(path, handler)
}

// RegisterHandlers not implemented.
func (p *HTTPNode) RegisterHandlers(dispatch []network.TaggedMessageHandler) {}

// Start starts http service
func (p *HTTPNode) Start() error {
go func() {
err := p.httpServer.Serve()
require.NoError(p.tb, err)
}()
return nil
}

// Stop stops http service
func (p *HTTPNode) Stop() {
p.httpServer.Close()
p.Host.Close()
}

// GetGenesisID returns genesisID
func (p *HTTPNode) GetGenesisID() string { return p.genesisID }

// SetGenesisID sets genesisID
func (p *HTTPNode) SetGenesisID(genesisID string) { p.genesisID = genesisID }

type httpPeer struct {
addrInfo peer.AddrInfo
tb testing.TB
}

// GetAddress implements HTTPPeer interface returns the address of the peer
func (p httpPeer) GetAddress() string {
mas, err := peer.AddrInfoToP2pAddrs(&p.addrInfo)
require.NoError(p.tb, err)
require.Len(p.tb, mas, 1)
return mas[0].String()
}

// GetAddress implements HTTPPeer interface and returns the http client for a peer
func (p httpPeer) GetHTTPClient() *http.Client {
c, err := p2p.MakeHTTPClient(&p.addrInfo)
require.NoError(p.tb, err)
return c
}

// SetPeers sets peers
func (p *HTTPNode) SetPeers(other *HTTPNode) {
addrInfo := peer.AddrInfo{ID: other.ID(), Addrs: other.Addrs()}
hpeer := httpPeer{addrInfo, p.tb}
p.peers = append(p.peers, hpeer)
}

// GetPeers returns peers
func (p *HTTPNode) GetPeers(options ...network.PeerOption) []network.Peer {
return p.peers
}
Loading
Loading