Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example of echo server using routed hosts with local and ipfs bootstrap #278

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions examples/routed-echo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Routed Host: echo client/server

This example is intended to follow up the basic host and echo examples by adding use of the ipfs distributed hash table to lookup peers.

Functionally, this example works similarly to the echo example, however setup of the host includes wrapping it with a Kademila hash table, so it can find peers using only their IDs.

We'll also enable NAT port mapping to illustrate the setup, although it isn't guaranteed to actually be used to make the connections. Additionally, this example uses the newer `bhost.NewHost` constructor.

## Build

From `go-libp2p` base folder:

```
> make deps
> go build ./examples/routed-echo
```

## Usage


```
> ./routed-echo -l 10000
2018/02/19 12:22:32 I can be reached at:
2018/02/19 12:22:32 /ip4/127.0.0.1/tcp/10000/ipfs/QmfRY4vuKpU2tApACrbmYFn9xoeNzMQhLXg7nKnyvnzHeL
2018/02/19 12:22:32 /ip4/192.168.1.203/tcp/10000/ipfs/QmfRY4vuKpU2tApACrbmYFn9xoeNzMQhLXg7nKnyvnzHeL
2018/02/19 12:22:32 Now run "./routed-echo -l 10001 -d QmfRY4vuKpU2tApACrbmYFn9xoeNzMQhLXg7nKnyvnzHeL" on a different terminal
2018/02/19 12:22:32 listening for connections
```

The listener libp2p host will print its randomly generated Base58 encoded ID string, which combined with the ipfs DHT, can be used to reach the host, despite lacking other connection details. By default, this example will bootstrap off your local IPFS peer (assuming one is running). If you'd rather bootstrap off the same peers go-ipfs uses, pass the `-global` flag in both terminals.

Now, launch another node that talks to the listener:

```
> ./routed-echo -l 10001 -d QmfRY4vuKpU2tApACrbmYFn9xoeNzMQhLXg7nKnyvnzHeL
```

As in other examples, the new node will send the message `"Hello, world!"` to the listener, which will in turn echo it over the stream and close it. The listener logs the message, and the sender logs the response.

## Details

The `makeRoutedHost()` function creates a [go-libp2p routedhost](https://godoc.org/github.com/libp2p/go-libp2p/p2p/host/routed) object. `routedhost` objects wrap [go-libp2p basichost](https://godoc.org/github.com/libp2p/go-libp2p/p2p/host/basic) and add the ability to lookup a peers address using the ipfs distributed hash table as implemented by [go-libp2p-kad-dht](https://godoc.org/github.com/libp2p/go-libp2p-kad-dht).

In order to create the routed host, the example needs:

- A [go-libp2p basichost](https://godoc.org/github.com/libp2p/go-libp2p/p2p/host/basic) as in other examples.
- A [go-libp2p-kad-dht](https://godoc.org/github.com/libp2p/go-libp2p-kad-dht) which provides the ability to lookup peers by ID. Wrapping takes place via `routedHost := rhost.Wrap(basicHost, dht)`

A `routedhost` can now open streams (bi-directional channel between to peers) using [NewStream](https://godoc.org/github.com/libp2p/go-libp2p/p2p/host/basic#BasicHost.NewStream) and use them to send and receive data tagged with a `Protocol.ID` (a string). The host can also listen for incoming connections for a given
`Protocol` with [`SetStreamHandle()`](https://godoc.org/github.com/libp2p/go-libp2p/p2p/host/basic#BasicHost.SetStreamHandler). The advantage of the routed host is that only the Peer ID is required to make the connection, not the underlying address details, since they are provided by the DHT.

The example makes use of all of this to enable communication between a listener and a sender using protocol `/echo/1.0.0` (which could be any other thing).
128 changes: 128 additions & 0 deletions examples/routed-echo/bootstrap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package main

import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"

host "github.com/libp2p/go-libp2p-host"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
)

var (
IPFS_PEERS = convertPeers([]string{
"/ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",
"/ip4/104.236.179.241/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM",
"/ip4/128.199.219.111/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu",
"/ip4/104.236.76.40/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64",
"/ip4/178.62.158.247/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd",
"/ip6/2604:a880:1:20::203:d001/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM",
"/ip6/2400:6180:0:d0::151:6001/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu",
"/ip6/2604:a880:800:10::4a:5001/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64",
"/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd",
})
LOCAL_PEER_ENDPOINT = "http://localhost:5001/api/v0/id"
)

// Borrowed from ipfs code to parse the results of the command `ipfs id`
type IdOutput struct {
ID string
PublicKey string
Addresses []string
AgentVersion string
ProtocolVersion string
}

// quick and dirty function to get the local ipfs daemons address for bootstrapping
func getLocalPeerInfo() []pstore.PeerInfo {
resp, err := http.Get(LOCAL_PEER_ENDPOINT)
if err != nil {
log.Fatalln(err)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatalln(err)
}
var js IdOutput
err = json.Unmarshal(body, &js)
if err != nil {
log.Fatalln(err)
}
for _, addr := range js.Addresses {
// For some reason, possibly NAT traversal, we need to grab the loopback ip address
if addr[0:8] == "/ip4/127" {
return convertPeers([]string{addr})
}
}
log.Fatalln(err)
return make([]pstore.PeerInfo, 1) // not reachable, but keeps the compiler happy
}

func convertPeers(peers []string) []pstore.PeerInfo {
pinfos := make([]pstore.PeerInfo, len(peers))
for i, peer := range peers {
maddr := ma.StringCast(peer)
p, err := pstore.InfoFromP2pAddr(maddr)
if err != nil {
log.Fatalln(err)
}
pinfos[i] = *p
}
return pinfos
}

// This code is borrowed from the go-ipfs bootstrap process
func bootstrapConnect(ctx context.Context, ph host.Host, peers []pstore.PeerInfo) error {
if len(peers) < 1 {
return errors.New("not enough bootstrap peers")
}

errs := make(chan error, len(peers))
var wg sync.WaitGroup
for _, p := range peers {

// performed asynchronously because when performed synchronously, if
// one `Connect` call hangs, subsequent calls are more likely to
// fail/abort due to an expiring context.
// Also, performed asynchronously for dial speed.

wg.Add(1)
go func(p pstore.PeerInfo) {
defer wg.Done()
defer log.Println(ctx, "bootstrapDial", ph.ID(), p.ID)
log.Printf("%s bootstrapping to %s", ph.ID(), p.ID)

ph.Peerstore().AddAddrs(p.ID, p.Addrs, pstore.PermanentAddrTTL)
if err := ph.Connect(ctx, p); err != nil {
log.Println(ctx, "bootstrapDialFailed", p.ID)
log.Printf("failed to bootstrap with %v: %s", p.ID, err)
errs <- err
return
}
log.Println(ctx, "bootstrapDialSuccess", p.ID)
log.Printf("bootstrapped with %v", p.ID)
}(p)
}
wg.Wait()

// our failure condition is when no connection attempt succeeded.
// So drain the errs channel, counting the results.
close(errs)
count := 0
var err error
for err = range errs {
if err != nil {
count++
}
}
if count == len(peers) {
return fmt.Errorf("failed to bootstrap. %s", err)
}
return nil
}
214 changes: 214 additions & 0 deletions examples/routed-echo/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package main

import (
"bufio"
"context"
"crypto/rand"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
mrand "math/rand"

ds "github.com/ipfs/go-datastore"
dsync "github.com/ipfs/go-datastore/sync"
golog "github.com/ipfs/go-log"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
dht "github.com/libp2p/go-libp2p-kad-dht"
net "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
swarm "github.com/libp2p/go-libp2p-swarm"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
rhost "github.com/libp2p/go-libp2p/p2p/host/routed"
ma "github.com/multiformats/go-multiaddr"
gologging "github.com/whyrusleeping/go-logging"
)

// makeRoutedHost creates a LibP2P host with a random peer ID listening on the
// given multiaddress. It will use secio if secio is true. It will bootstrap using the
// provided PeerInfo
func makeRoutedHost(listenPort int, randseed int64, bootstrapPeers []pstore.PeerInfo, globalFlag string) (host.Host, error) {

// If the seed is zero, use real cryptographic randomness. Otherwise, use a
// deterministic randomness source to make generated keys stay the same
// across multiple runs
var r io.Reader
if randseed == 0 {
r = rand.Reader
} else {
r = mrand.New(mrand.NewSource(randseed))
}

// Generate a key pair for this host. We will use it at least
// to obtain a valid host ID.
priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
if err != nil {
return nil, err
}

// Get the peer id
pid, err := peer.IDFromPrivateKey(priv)
if err != nil {
return nil, err
}

maddr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", listenPort))
if err != nil {
return nil, err
}

// We've created the identity, now we need to store it to use the bhost constructors
ps := pstore.NewPeerstore()
ps.AddPrivKey(pid, priv)
ps.AddPubKey(pid, priv.GetPublic())

// Put all this together
ctx := context.Background()
netw, err := swarm.NewNetwork(ctx, []ma.Multiaddr{maddr}, pid, ps, nil)
if err != nil {
return nil, err
}

hostOpts := &bhost.HostOpts{
NATManager: bhost.NewNATManager(netw),
}

basicHost, err := bhost.NewHost(ctx, netw, hostOpts)
if err != nil {
return nil, err
}

// Construct a datastore (needed by the DHT). This is just a simple, in-memory thread-safe datastore.
dstore := dsync.MutexWrap(ds.NewMapDatastore())

// Make the DHT
dht := dht.NewDHT(ctx, basicHost, dstore)

// Make the routed host
routedHost := rhost.Wrap(basicHost, dht)

// connect to the chosen ipfs nodes
err = bootstrapConnect(ctx, routedHost, bootstrapPeers)
if err != nil {
return nil, err
}

// Bootstrap the host
err = dht.Bootstrap(ctx)
if err != nil {
return nil, err
}

// Build host multiaddress
hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", routedHost.ID().Pretty()))

// Now we can build a full multiaddress to reach this host
// by encapsulating both addresses:
// addr := routedHost.Addrs()[0]
addrs := routedHost.Addrs()
log.Println("I can be reached at:")
for _, addr := range addrs {
log.Println(addr.Encapsulate(hostAddr))
}

log.Printf("Now run \"./routed-echo -l %d -d %s%s\" on a different terminal\n", listenPort+1, routedHost.ID().Pretty(), globalFlag)

return routedHost, nil
}

func main() {
// LibP2P code uses golog to log messages. They log with different
// string IDs (i.e. "swarm"). We can control the verbosity level for
// all loggers with:
golog.SetAllLoggers(gologging.INFO) // Change to DEBUG for extra info

// Parse options from the command line
listenF := flag.Int("l", 0, "wait for incoming connections")
target := flag.String("d", "", "target peer to dial")
seed := flag.Int64("seed", 0, "set random seed for id generation")
global := flag.Bool("global", false, "use global ipfs peers for bootstrapping")
flag.Parse()

if *listenF == 0 {
log.Fatal("Please provide a port to bind on with -l")
}

// Make a host that listens on the given multiaddress
var bootstrapPeers []pstore.PeerInfo
var globalFlag string
if *global {
log.Println("using global bootstrap")
bootstrapPeers = IPFS_PEERS
globalFlag = " -global"
} else {
log.Println("using local bootstrap")
bootstrapPeers = getLocalPeerInfo()
globalFlag = ""
}
ha, err := makeRoutedHost(*listenF, *seed, bootstrapPeers, globalFlag)
if err != nil {
log.Fatal(err)
}

// Set a stream handler on host A. /echo/1.0.0 is
// a user-defined protocol name.
ha.SetStreamHandler("/echo/1.0.0", func(s net.Stream) {
log.Println("Got a new stream!")
if err := doEcho(s); err != nil {
log.Println(err)
s.Reset()
} else {
s.Close()
}
})

if *target == "" {
log.Println("listening for connections")
select {} // hang forever
}
/**** This is where the listener code ends ****/

peerid, err := peer.IDB58Decode(*target)
if err != nil {
log.Fatalln(err)
}

// peerinfo := pstore.PeerInfo{ID: peerid}
log.Println("opening stream")
// make a new stream from host B to host A
// it should be handled on host A by the handler we set above because
// we use the same /echo/1.0.0 protocol
s, err := ha.NewStream(context.Background(), peerid, "/echo/1.0.0")

if err != nil {
log.Fatalln(err)
}

_, err = s.Write([]byte("Hello, world!\n"))
if err != nil {
log.Fatalln(err)
}

out, err := ioutil.ReadAll(s)
if err != nil {
log.Fatalln(err)
}

log.Printf("read reply: %q\n", out)
}

// doEcho reads a line of data from a stream and writes it back
func doEcho(s net.Stream) error {
buf := bufio.NewReader(s)
str, err := buf.ReadString('\n')
if err != nil {
return err
}

log.Printf("read: %s\n", str)
_, err = s.Write([]byte(str))
return err
}
Loading