From dbc22690e5e16617342bb4679fc4bc72acb215d2 Mon Sep 17 00:00:00 2001 From: zelig Date: Sun, 21 Dec 2014 19:09:47 +0000 Subject: [PATCH 01/17] initial commit of p2p peer selection/cademlia --- p2p/TODO | 3 + p2p/peer_selector.go | 163 +++++++++++++++++++++++++++++++++++++++++++ p2p/server.go | 21 +++++- 3 files changed, 184 insertions(+), 3 deletions(-) create mode 100644 p2p/TODO create mode 100644 p2p/peer_selector.go diff --git a/p2p/TODO b/p2p/TODO new file mode 100644 index 000000000000..ad8ce170be3d --- /dev/null +++ b/p2p/TODO @@ -0,0 +1,3 @@ +Define peer address' time field and lastSeen function. +Overwrite time field when connect and disconnect. +Change protocol getPeers message response conditional on target. peerList if noTarget else server.getPeers diff --git a/p2p/peer_selector.go b/p2p/peer_selector.go new file mode 100644 index 000000000000..e60b19eee869 --- /dev/null +++ b/p2p/peer_selector.go @@ -0,0 +1,163 @@ +package p2p + +import ( + "time" +) + +type PeerSelector interface { + SuggestPeer(addr *peerAddr) (ok bool) + // AddPeer(addr *peerAddr) (ok bool) + GetPeers(target []byte) []*peerAddr + Start() + Stop() +} + +type BaseSelector struct { + DirPath string +} + +func (self *BaseSelector) SuggestPeer(addr *peerAddr) bool { + return true +} + +const ( + hashBits = 160 + rowLength = 10 + maxAge = 1 +) + +type Cademlia struct { + rows [hashBits]*row + hashBits int + rowLength int + maxAge time.Duration + index map[string]*peerData +} + +type row struct { + length int + row []*peerData + lock sync.RWMutex +} + +func (self *row) addresses() (addrs []*peerAddr) { + self.lock.RLock() + defer self.lock.RUnlock() + for _, p := range self.row { + addrs = append(addrs, p.addr) + } + return +} + +func (self *row) insert(addr *peerAddr) (ok bool) { + self.lock.Lock() + defer self.lock.Unlock() + peerData := &peerData{addr: addr} + if len(self.row) >= self.length { + self.row[self.worst()] = peerData + } else { + self.row = append(self.row, peerData) + ok = true + } + return +} + +func (self *row) worst() (index int) { + var oldest time.Time + for i, p := range self.row { + if oldest == nil || p.addr.LastSeen().Before(oldest) { + oldest = p.addr.LastSeen + index = i + } + } + return +} + +func (self *row) purge(maxAge time.Time) { + var newRow []*peerData + for _, p := range self.row { + if !p.addr.LastSeen().Before(maxAge) { + newRow = append(newRow, p) + } + } +} + +type peerData struct { + addr *peerAddr + hash []byte +} + +func Hash([]byte) []byte { + +} + +func (self *Cademlia) prefixLength(other []byte) { + +} + +func newCademlia() *Cademlia { + return &Cademlia{ + hashBits: hashBits, + rowLength: rowLength, + maxAge: maxAge * time.Second, + rows: make([hashBits]*row), + index: make(map[string]*peerData), + } +} + +func (self *Cademlia) Start() { + go self.purgeLoop() +} + +func (self *Cademlia) purgeLoop() { + ticker := time.Tick(self.purgeInterval) + for { + select { + case <-ticker: + for _, r := range self.rows { + r.purge(time.Since(self.maxAge)) + } + } + } +} + +func (self *Cademlia) SuggestPeer(addr *peerAddr) bool { + index := self.commonPrefixLength(Hash(addr.Pubkey)) + row := self.rows[index] + longer := row.insert(addr) + if index >= self.depth && longer { + self.updateDepth() + } + return longer +} + +func (self *Cademlia) GetPeers(target []byte) (peers []*peerAddr) { + index := self.prefixLength(target) + if index >= self.depth { + for i := self.depth; i < hashBits; i++ { + peers = append(peers, self.rows[i].addresses()) + } + } else { + peers = self.rows[index].addresses() + } + return +} + +func Xor(one, other []byte) (xor []byte) { + for i := 0; i < len(one); i++ { + xor[i] = one[i] ^ other[i] + } + return +} + +func (self *Cademlia) commonPrefixLength(other []byte) (ret int) { + xor := Xor(self.hash, other) + for i := 0; i < len(self.hash); i++ { + for j := 0; j < 8; j++ { + if (xor[i]>>uint8(7-j))&0x1 != 0 { + return i*8 + j + } + } + } + return len(self.hash)*8 - 1 +} diff --git a/p2p/server.go b/p2p/server.go index 326781234313..e91a83061c1d 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -62,6 +62,9 @@ type Server struct { // If NoDial is true, the server will not dial any peers. NoDial bool + // peer selector + PeerSelector PeerSelector + // Hook for testing. This is useful because we can inhibit // the whole protocol stack. newPeerFunc peerFunc @@ -104,6 +107,10 @@ func (srv *Server) Peers() (peers []*Peer) { return } +func (srv *Server) GetPeers(target []byte) (peers []*Peer) { + // delegate to selector +} + // PeerCount returns the number of connected peers. func (srv *Server) PeerCount() int { srv.lock.RLock() @@ -113,9 +120,13 @@ func (srv *Server) PeerCount() int { // SuggestPeer injects an address into the outbound address pool. func (srv *Server) SuggestPeer(ip net.IP, port int, nodeID []byte) { - select { - case srv.peerConnect <- &peerAddr{ip, uint64(port), nodeID}: - default: // don't block + addr := &peerAddr{ip, uint64(port), nodeID, time.Now()} + ok := srv.PeerSelector.SuggestPeer(addr) + if ok { + select { + case srv.peerConnect <- addr: + default: // don't block + } } } @@ -185,6 +196,10 @@ func (srv *Server) Start() (err error) { srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.") } + if srv.PeerSelector == nil { + srv.PeerSelector = &BaseSelector{} + } + // make all slots available for i := range srv.peers { srv.peerSlots <- i From bdb4f9dd83668edb5e5c93f9f2c5b3d50fbc2709 Mon Sep 17 00:00:00 2001 From: zelig Date: Tue, 6 Jan 2015 01:56:26 +0000 Subject: [PATCH 02/17] peer selection - SuggestPeer lookup convenience method from backend to server - now takes (host, pubkey) arguments - earlier suggest peer -> AddPeer which now calls the peer selector - delete peer_util (save, restore functionality belongs to peer selector) - peerRecord implements peerInfo (a record entry object for peer info) - introduce lastActive timestamping via the protocol pingpong loop - instead of connect channel, peer simply takes 2 functions (addPeer, getPeers) this is inline with blockPool etc - protocol.PeerList now many phases (server gives active peers including own address, selector proximity peers, and protocol filters for recipient peer and does encoding) - fix and simplify peer_selector --- cmd/mist/ui_lib.go | 2 +- eth/backend.go | 15 +- eth/peer_util.go | 23 --- javascript/javascript_runtime.go | 2 +- p2p/TODO | 2 - p2p/peer.go | 80 ++++++---- p2p/peer_selector.go | 252 +++++++++++++++++++++---------- p2p/protocol.go | 43 ++++-- p2p/protocol_test.go | 21 ++- p2p/server.go | 62 ++++++-- 10 files changed, 324 insertions(+), 178 deletions(-) delete mode 100644 eth/peer_util.go diff --git a/cmd/mist/ui_lib.go b/cmd/mist/ui_lib.go index 0aabb87d0951..843c168de366 100644 --- a/cmd/mist/ui_lib.go +++ b/cmd/mist/ui_lib.go @@ -195,7 +195,7 @@ func (ui *UiLib) Connect(button qml.Object) { } func (ui *UiLib) ConnectToPeer(addr string) { - if err := ui.eth.SuggestPeer(addr); err != nil { + if err := ui.eth.SuggestPeer(addr, []byte{}); err != nil { guilogger.Infoln(err) } } diff --git a/eth/backend.go b/eth/backend.go index 065a4f7d84fd..7f397eef4102 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -2,7 +2,6 @@ package eth import ( "fmt" - "net" "sync" "github.com/ethereum/go-ethereum/core" @@ -21,6 +20,8 @@ const ( seedNodeAddress = "poc-7.ethdev.com:30300" ) +var seednodeId []byte = nil + type Config struct { Name string Version string @@ -248,7 +249,7 @@ func (s *Ethereum) Start(seed bool) error { // TODO: read peers here if seed { logger.Infof("Connect to seed node %v", seedNodeAddress) - if err := s.SuggestPeer(seedNodeAddress); err != nil { + if err := s.SuggestPeer(seedNodeAddress, seednodeId); err != nil { return err } } @@ -257,14 +258,8 @@ func (s *Ethereum) Start(seed bool) error { return nil } -func (self *Ethereum) SuggestPeer(addr string) error { - netaddr, err := net.ResolveTCPAddr("tcp", addr) - if err != nil { - logger.Errorf("couldn't resolve %s:", addr, err) - return err - } - - self.net.SuggestPeer(netaddr.IP, netaddr.Port, nil) +func (self *Ethereum) SuggestPeer(addr string, pubkey []byte) error { + self.net.SuggestPeer(addr, pubkey) return nil } diff --git a/eth/peer_util.go b/eth/peer_util.go deleted file mode 100644 index 6cf80cde29f9..000000000000 --- a/eth/peer_util.go +++ /dev/null @@ -1,23 +0,0 @@ -package eth - -import ( - "encoding/json" - - "github.com/ethereum/go-ethereum/ethutil" -) - -func WritePeers(path string, addresses []string) { - if len(addresses) > 0 { - data, _ := json.MarshalIndent(addresses, "", " ") - ethutil.WriteFile(path, data) - } -} - -func ReadPeers(path string) (ips []string, err error) { - var data string - data, err = ethutil.ReadAllFile(path) - if err != nil { - json.Unmarshal([]byte(data), &ips) - } - return -} diff --git a/javascript/javascript_runtime.go b/javascript/javascript_runtime.go index af1405049f30..d0ccc9b2b2d1 100644 --- a/javascript/javascript_runtime.go +++ b/javascript/javascript_runtime.go @@ -203,7 +203,7 @@ func (self *JSRE) addPeer(call otto.FunctionCall) otto.Value { if err != nil { return otto.FalseValue() } - self.ethereum.SuggestPeer(host) + self.ethereum.SuggestPeer(host, nil) return otto.TrueValue() } diff --git a/p2p/TODO b/p2p/TODO index ad8ce170be3d..cd5891b7669e 100644 --- a/p2p/TODO +++ b/p2p/TODO @@ -1,3 +1 @@ -Define peer address' time field and lastSeen function. -Overwrite time field when connect and disconnect. Change protocol getPeers message response conditional on target. peerList if noTarget else server.getPeers diff --git a/p2p/peer.go b/p2p/peer.go index 0d7eec9f46a1..de21f60dd967 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -45,8 +45,48 @@ func (d peerAddr) String() string { return fmt.Sprintf("%v:%d", d.IP, d.Port) } -func (d *peerAddr) RlpData() interface{} { - return []interface{}{string(d.IP), d.Port, d.Pubkey} +func (d peerAddr) RlpData() interface{} { + return []interface{}{d.IP, d.Port, d.Pubkey} +} + +type peerRecord struct { + addr *peerAddr + hash []byte + lastActive time.Time + lastActiveC chan time.Time + peer *Peer +} + +func (self *peerRecord) Addr() *peerAddr { + return self.addr +} + +func (self *peerRecord) Hash() []byte { + if self.hash == nil { + self.hash = Hash(self.addr.Pubkey) + } + return self.hash +} + +func (self *peerRecord) LastActive() (lastActive time.Time) { + var ok bool + select { + case lastActive, ok = <-self.lastActiveC: + if ok { + self.lastActive = lastActive + } + default: + lastActive = self.lastActive + } + return +} + +func (self *peerRecord) Connect() error { + return nil +} + +func (self *peerRecord) Disconnect() error { + return nil } // Peer represents a remote peer. @@ -85,11 +125,11 @@ type Peer struct { // These fields are kept so base protocol can access them. // TODO: this should be one or more interfaces - ourID ClientIdentity // client id of the Server - ourListenAddr *peerAddr // listen addr of Server, nil if not listening - newPeerAddr chan<- *peerAddr // tell server about received peers - otherPeers func() []*Peer // should return the list of all peers - pubkeyHook func(*peerAddr) error // called at end of handshake to validate pubkey + ourID ClientIdentity // client id of the Server + ourListenAddr *peerAddr // listen addr of Server, nil if not listening + addPeer func(*peerAddr) error // tell server about received peers + getPeers func(...[]byte) []*peerAddr // should return the list of all peers + pubkeyHook func(*peerAddr) error // called at end of handshake to validate pubkey } // NewPeer returns a peer for testing purposes. @@ -104,8 +144,8 @@ func NewPeer(id ClientIdentity, caps []Cap) *Peer { func newServerPeer(server *Server, conn net.Conn, dialAddr *peerAddr) *Peer { p := newPeer(conn, server.Protocols, dialAddr) p.ourID = server.Identity - p.newPeerAddr = server.peerConnect - p.otherPeers = server.Peers + p.addPeer = server.AddPeer + p.getPeers = server.GetPeers p.pubkeyHook = server.verifyPeer p.runBaseProtocol = true @@ -460,25 +500,3 @@ func (r *eofSignal) Read(buf []byte) (int, error) { } return n, err } - -func (peer *Peer) PeerList() []interface{} { - peers := peer.otherPeers() - ds := make([]interface{}, 0, len(peers)) - for _, p := range peers { - p.infolock.Lock() - addr := p.listenAddr - p.infolock.Unlock() - // filter out this peer and peers that are not listening or - // have not completed the handshake. - // TODO: track previously sent peers and exclude them as well. - if p == peer || addr == nil { - continue - } - ds = append(ds, addr) - } - ourAddr := peer.ourListenAddr - if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() { - ds = append(ds, ourAddr) - } - return ds -} diff --git a/p2p/peer_selector.go b/p2p/peer_selector.go index e60b19eee869..81c644ad408c 100644 --- a/p2p/peer_selector.go +++ b/p2p/peer_selector.go @@ -1,23 +1,85 @@ package p2p import ( + "encoding/json" + "fmt" + "path" + "sync" "time" + + "github.com/ethereum/go-ethereum/ethutil" ) -type PeerSelector interface { - SuggestPeer(addr *peerAddr) (ok bool) - // AddPeer(addr *peerAddr) (ok bool) - GetPeers(target []byte) []*peerAddr - Start() - Stop() +type peerInfo interface { + Addr() *peerAddr + Hash() []byte + // Pubkey() []byte + LastActive() time.Time + Disconnect() error + Connect() error +} + +type peerSelector interface { + AddPeer(peer peerInfo) error + GetPeers(target ...[]byte) []*peerAddr + Start() error + Stop() error } type BaseSelector struct { - DirPath string + DirPath string + getPeers func() []*peerAddr + peers []peerInfo +} + +func (self *BaseSelector) AddPeer(peer peerInfo) error { + return nil +} + +func (self *BaseSelector) GetPeers(target ...[]byte) []*peerAddr { + return self.getPeers() } -func (self *BaseSelector) SuggestPeer(addr *peerAddr) bool { - return true +func (self *BaseSelector) Start() error { + if len(self.DirPath) > 0 { + path := path.Join(self.DirPath, "peers.json") + peers, err := ReadPeers(path) + if err != nil { + return err + } + self.peers = peers + } + return nil +} + +func (self *BaseSelector) Stop() error { + if len(self.DirPath) > 0 { + path := path.Join(self.DirPath, "peers.json") + if err := WritePeers(path, self.peers); err != nil { + return err + } + } + return nil +} + +func WritePeers(path string, addresses []peerInfo) error { + if len(addresses) > 0 { + data, err := json.MarshalIndent(addresses, "", " ") + if err == nil { + ethutil.WriteFile(path, data) + } + return err + } + return nil +} + +func ReadPeers(path string) (peers []peerInfo, err error) { + var data string + data, err = ethutil.ReadAllFile(path) + if err == nil { + json.Unmarshal([]byte(data), &peers) + } + return } const ( @@ -27,122 +89,150 @@ const ( ) type Cademlia struct { - rows [hashBits]*row + hash []byte hashBits int rowLength int - maxAge time.Duration - index map[string]*peerData + // index map[string]peerInfo + rows [hashBits]*row + + depth int + + maxAge time.Duration + purgeInterval time.Duration + + lock sync.RWMutex + quitC chan bool } -type row struct { - length int - row []*peerData - lock sync.RWMutex +func newCademlia(hash []byte) *Cademlia { + return &Cademlia{ + hash: hash, + hashBits: hashBits, + rowLength: rowLength, + maxAge: maxAge * time.Second, + rows: [hashBits]*row{}, + // index: make(map[string]peerInfo), + } } -func (self *row) addresses() (addrs []*peerAddr) { - self.lock.RLock() - defer self.lock.RUnlock() - for _, p := range self.row { - addrs = append(addrs, p.addr) +func (self *Cademlia) Start() error { + self.lock.Lock() + defer self.lock.Unlock() + if self.quitC != nil { + return nil } - return + self.quitC = make(chan bool) + go self.purgeLoop() + return nil } -func (self *row) insert(addr *peerAddr) (ok bool) { +func (self *Cademlia) Stop() { self.lock.Lock() defer self.lock.Unlock() - peerData := &peerData{addr: addr} - if len(self.row) >= self.length { - self.row[self.worst()] = peerData - } else { - self.row = append(self.row, peerData) - ok = true + if self.quitC == nil { + return } - return + close(self.quitC) + self.quitC = nil } -func (self *row) worst() (index int) { - var oldest time.Time - for i, p := range self.row { - if oldest == nil || p.addr.LastSeen().Before(oldest) { - oldest = p.addr.LastSeen - index = i +func (self *Cademlia) AddPeer(peer peerInfo) (err error) { + index := self.commonPrefixLength(peer.Hash()) + row := self.rows[index] + needed := row.insert(&entry{peer: peer}) + if needed { + if index >= self.depth { + self.updateDepth() } + } else { + err = fmt.Errorf("no worse peer found") } return } -func (self *row) purge(maxAge time.Time) { - var newRow []*peerData - for _, p := range self.row { - if !p.addr.LastSeen().Before(maxAge) { - newRow = append(newRow, p) +func (self *Cademlia) GetPeers(target []byte) (peers []*peerAddr) { + index := self.commonPrefixLength(target) + var entries []*entry + if index >= self.depth { + for i := self.depth; i < self.hashBits; i++ { + entries = append(entries, self.rows[i].row...) } + } else { + entries = self.rows[index].row } -} -type peerData struct { - addr *peerAddr - hash []byte + for _, entry := range entries { + peers = append(peers, entry.peer.Addr()) + } + return } -func Hash([]byte) []byte { +type entry struct { + peer peerInfo + // metadata +} +type row struct { + length int + row []*entry + lock sync.RWMutex } -func (self *Cademlia) prefixLength(other []byte) { +func (self *row) insert(entry *entry) (ok bool) { + self.lock.Lock() + defer self.lock.Unlock() + if len(self.row) >= self.length { + self.row[self.worst()] = entry + } else { + self.row = append(self.row, entry) + ok = true + } + return +} +func (self *row) worst() (index int) { + var oldest time.Time + for i, entry := range self.row { + if (oldest == time.Time{}) || entry.peer.LastActive().Before(oldest) { + oldest = entry.peer.LastActive() + index = i + } + } + return } -func newCademlia() *Cademlia { - return &Cademlia{ - hashBits: hashBits, - rowLength: rowLength, - maxAge: maxAge * time.Second, - rows: make([hashBits]*row), - index: make(map[string]*peerData), +func (self *row) purge(recently time.Time) { + var newRow []*entry + for _, entry := range self.row { + if !entry.peer.LastActive().Before(recently) { + newRow = append(newRow, entry) + } else { + entry.peer.Disconnect() + } } } -func (self *Cademlia) Start() { - go self.purgeLoop() +func Hash(key []byte) []byte { + return key +} + +func (self *Cademlia) updateDepth() { } func (self *Cademlia) purgeLoop() { ticker := time.Tick(self.purgeInterval) for { select { + case <-self.quitC: + return case <-ticker: for _, r := range self.rows { - r.purge(time.Since(self.maxAge)) + r.purge(time.Now().Add(-self.maxAge)) } } } } -func (self *Cademlia) SuggestPeer(addr *peerAddr) bool { - index := self.commonPrefixLength(Hash(addr.Pubkey)) - row := self.rows[index] - longer := row.insert(addr) - if index >= self.depth && longer { - self.updateDepth() - } - return longer -} - -func (self *Cademlia) GetPeers(target []byte) (peers []*peerAddr) { - index := self.prefixLength(target) - if index >= self.depth { - for i := self.depth; i < hashBits; i++ { - peers = append(peers, self.rows[i].addresses()) - } - } else { - peers = self.rows[index].addresses() - } - return -} - func Xor(one, other []byte) (xor []byte) { for i := 0; i < len(one); i++ { xor[i] = one[i] ^ other[i] @@ -152,12 +242,12 @@ func Xor(one, other []byte) (xor []byte) { func (self *Cademlia) commonPrefixLength(other []byte) (ret int) { xor := Xor(self.hash, other) - for i := 0; i < len(self.hash); i++ { + for i := 0; i < self.hashBits; i++ { for j := 0; j < 8; j++ { if (xor[i]>>uint8(7-j))&0x1 != 0 { return i*8 + j } } } - return len(self.hash)*8 - 1 + return self.hashBits*8 - 1 } diff --git a/p2p/protocol.go b/p2p/protocol.go index dd8cbc4ecdbc..356a89413afa 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -105,12 +105,16 @@ func runBaseProtocol(peer *Peer, rw MsgReadWriter) error { } } }() - return bp.loop(errc) + var lastActiveC chan time.Time + if bp.peer.listenAddr != nil { + lastActiveC = bp.peer.listenAddr.lastActiveC + } + return bp.loop(errc, lastActiveC) } var pingTimeout = 2 * time.Second -func (bp *baseProtocol) loop(quit <-chan error) error { +func (bp *baseProtocol) loop(quit <-chan error, lastActiveC chan time.Time) error { ping := time.NewTimer(pingTimeout) activity := bp.peer.activity.Subscribe(time.Time{}) lastActive := time.Time{} @@ -125,6 +129,7 @@ func (bp *baseProtocol) loop(quit <-chan error) error { select { case err = <-quit: return err + case lastActiveC <- lastActive: case <-getPeersTick.C: err = bp.rw.EncodeMsg(getPeersMsg) case event := <-activity.Chan(): @@ -169,15 +174,29 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { case pongMsg: case getPeersMsg: - peers := bp.peer.PeerList() - // this is dangerous. the spec says that we should _delay_ - // sending the response if no new information is available. - // this means that would need to send a response later when - // new peers become available. - // - // TODO: add event mechanism to notify baseProtocol for new peers - if len(peers) > 0 { - return bp.rw.EncodeMsg(peersMsg, peers...) + var target [][]byte + if err := msg.Decode(&target); err != nil { + return newPeerError(errInvalidMsg, "%v", err) + } + + peers := bp.peer.getPeers(target...) + if len(target) == 0 { + // then add ourselves to the list + ourAddr := bp.peer.ourListenAddr + if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() { + peers = append(peers, ourAddr) + } + } + ds := make([]interface{}, 0, len(peers)) + // encode and filter out requesting peer + for _, addr := range peers { + if addr != bp.peer.listenAddr { + ds = append(ds, addr) + } + } + + if len(ds) > 0 { + return bp.rw.EncodeMsg(peersMsg, ds...) } case peersMsg: @@ -187,7 +206,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { } for _, addr := range peers { bp.peer.Debugf("received peer suggestion: %v", addr) - bp.peer.newPeerAddr <- addr + bp.peer.addPeer(addr) } default: diff --git a/p2p/protocol_test.go b/p2p/protocol_test.go index ce25b3e1b541..38a30949570e 100644 --- a/p2p/protocol_test.go +++ b/p2p/protocol_test.go @@ -31,7 +31,7 @@ func newTestPeer() (peer *Peer) { peer.pubkeyHook = func(*peerAddr) error { return nil } peer.ourID = &peerId{} peer.listenAddr = &peerAddr{} - peer.otherPeers = func() []*Peer { return nil } + peer.getPeers = func(...[]byte) []*peerAddr { return nil } return } @@ -58,25 +58,32 @@ func TestBaseProtocolPeers(t *testing.T) { for got = range addrChan { own = append(own, got) } - if len(own) != 1 || !reflect.DeepEqual(ownAddr, own[0]) { - t.Errorf("mismatch: peers own address is incorrectly or not given, got %v, want %#v", ownAddr) + if len(own) < 1 { + t.Errorf("mismatch: peers own address not given") + } else { + if !reflect.DeepEqual(ownAddr, own[0]) { + t.Errorf("mismatch: peers own address is incorrectly or not given, got %v, want %#v", own[0], ownAddr) + } } rw2.Close() }() // run first peer peer1 := newTestPeer() peer1.ourListenAddr = ownAddr - peer1.otherPeers = func() []*Peer { - pl := make([]*Peer, len(cannedPeerList)) + peer1.getPeers = func(...[]byte) []*peerAddr { + pl := make([]*peerAddr, len(cannedPeerList)) for i, addr := range cannedPeerList { - pl[i] = &Peer{listenAddr: addr} + pl[i] = addr } return pl } go runBaseProtocol(peer1, rw1) // run second peer peer2 := newTestPeer() - peer2.newPeerAddr = addrChan // feed peer suggestions into matcher + peer2.addPeer = func(addr *peerAddr) error { + addrChan <- addr // feed peer suggestions into matcher + return nil + } if err := runBaseProtocol(peer2, rw2); err != ErrPipeClosed { t.Errorf("peer2 terminated with unexpected error: %v", err) } diff --git a/p2p/server.go b/p2p/server.go index 441eec6711a7..dff4738ad518 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -63,7 +63,7 @@ type Server struct { NoDial bool // peer selector - PeerSelector PeerSelector + PeerSelector peerSelector // Hook for testing. This is useful because we can inhibit // the whole protocol stack. @@ -107,8 +107,34 @@ func (srv *Server) Peers() (peers []*Peer) { return } -func (srv *Server) GetPeers(target []byte) (peers []*Peer) { - // delegate to selector +// GetActivePeers returns addresses of all connected peers. +func (srv *Server) GetActivePeers() (peers []*peerAddr) { + for _, peer := range srv.Peers() { + if peer != nil { + peer.infolock.Lock() + addr := peer.listenAddr + peer.infolock.Unlock() + // filter out this peer and peers that are not listening or + // have not completed the handshake. + // TODO: track previously sent peers and exclude them as well. + if addr == nil { + continue + } + peers = append(peers, addr) + } + } + return peers +} + +// GetPeers returns addresses near target if given or supported by the client +// or falls back to all actively connected peers +func (srv *Server) GetPeers(target ...[]byte) (peers []*peerAddr) { + if len(target) == 1 { // delegate to selector + srv.PeerSelector.GetPeers(target[0]) + return peers + } else { + return srv.GetActivePeers() + } } // PeerCount returns the number of connected peers. @@ -118,14 +144,30 @@ func (srv *Server) PeerCount() int { return srv.peerCount } -// SuggestPeer injects an address into the outbound address pool. -func (srv *Server) SuggestPeer(ip net.IP, port int, nodeID []byte) { - addr := &peerAddr{IP: ip, Port: uint64(port), Pubkey: nodeID} - select { - case srv.peerConnect <- addr: - default: // don't block - srvlog.Warnf("peer suggestion %v ignored", addr) +// SuggestPeer is a convenient method that does dns resolution +// and passes on the request to AddPeer +func (srv *Server) SuggestPeer(addr string, pubkey []byte) error { + netaddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + srvlog.Errorf("couldn't resolve %s:", addr, err) + return err } + peerAddr := &peerAddr{IP: netaddr.IP, Port: uint64(netaddr.Port), Pubkey: pubkey} + return srv.AddPeer(peerAddr) +} + +// AddPeer takes a peerAddr address as argument. +// If not found among connected peers turns to the peerSelector +// to decide if it is a worthwhile connection +func (srv *Server) AddPeer(addr *peerAddr) (err error) { + // need to look up nodeID first + peer := &peerRecord{addr: addr} + if err = srv.PeerSelector.AddPeer(peer); err == nil { + srvlog.Infof("peer %v accepted by peer selection", peer) + } else { + srvlog.Infof("peer %v rejected by peer selection", peer) + } + return } // Broadcast sends an RLP-encoded message to all connected peers. From 707b929acda6d7a10f19443e6673d79d6dda6ed7 Mon Sep 17 00:00:00 2001 From: zelig Date: Tue, 6 Jan 2015 18:15:26 +0000 Subject: [PATCH 03/17] extract cademlia (implements the peer selector interface) to a separate file - maybe should be in a different package... --- p2p/cademlia.go | 177 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 p2p/cademlia.go diff --git a/p2p/cademlia.go b/p2p/cademlia.go new file mode 100644 index 000000000000..d1567b74b030 --- /dev/null +++ b/p2p/cademlia.go @@ -0,0 +1,177 @@ +package p2p + +import ( + "sync" + "time" + + ethlogger "github.com/ethereum/go-ethereum/logger" +) + +var cadlogger = ethlogger.NewLogger("CAD") + +const ( + hashBits = 160 + rowLength = 10 + maxAge = 1 +) + +type Cademlia struct { + hash []byte + hashBits int + rowLength int + rows [hashBits]*row + + depth int + + maxAge time.Duration + purgeInterval time.Duration + + lock sync.RWMutex + quitC chan bool +} + +func newCademlia(hash []byte) *Cademlia { + return &Cademlia{ + hash: hash, + hashBits: hashBits, + rowLength: rowLength, + maxAge: maxAge * time.Second, + rows: [hashBits]*row{}, + } +} + +func (self *Cademlia) Start() error { + self.lock.Lock() + defer self.lock.Unlock() + if self.quitC != nil { + return nil + } + self.quitC = make(chan bool) + go self.purgeLoop() + return nil +} + +func (self *Cademlia) Stop() { + self.lock.Lock() + defer self.lock.Unlock() + if self.quitC == nil { + return + } + close(self.quitC) + self.quitC = nil +} + +func (self *Cademlia) AddPeer(peer peerInfo) (needed bool) { + index := self.commonPrefixLength(peer.Hash()) + row := self.rows[index] + needed = row.insert(&entry{peer: peer}) + if needed { + if index >= self.depth { + go self.updateDepth() + } + cadlogger.Infof("accept peer %x...", peer.Hash()[:8]) + } else { + cadlogger.Infof("reject peer %x... no worse peer found", peer.Hash()[:8]) + } + return +} + +func (self *Cademlia) GetPeers(target []byte) (peers []peerInfo) { + index := self.commonPrefixLength(target) + var entries []*entry + if index >= self.depth { + for i := self.depth; i < self.hashBits; i++ { + entries = append(entries, self.rows[i].row...) + } + } else { + entries = self.rows[index].row + } + return +} + +type entry struct { + peer peerInfo + // metadata +} + +type row struct { + length int + row []*entry + lock sync.RWMutex +} + +func (self *row) insert(entry *entry) (ok bool) { + self.lock.Lock() + defer self.lock.Unlock() + if len(self.row) >= self.length { + worst := self.worst() + // err = diconnectF(self.row[worst]) + self.row[worst] = entry + } else { + self.row = append(self.row, entry) + ok = true + } + return +} + +func (self *row) worst() (index int) { + var oldest time.Time + for i, entry := range self.row { + if (oldest == time.Time{}) || entry.peer.LastActive().Before(oldest) { + oldest = entry.peer.LastActive() + index = i + } + } + return +} + +func (self *row) purge(recently time.Time) { + var newRow []*entry + for _, entry := range self.row { + if !entry.peer.LastActive().Before(recently) { + newRow = append(newRow, entry) + } else { + // self.DisconnectF(entry.peer) + } + } +} + +func Hash(key []byte) []byte { + return key +} + +func (self *Cademlia) updateDepth() { +} + +func (self *Cademlia) purgeLoop() { + ticker := time.Tick(self.purgeInterval) + for { + select { + case <-self.quitC: + return + case <-ticker: + for _, r := range self.rows { + r.purge(time.Now().Add(-self.maxAge)) + } + } + } +} + +func Xor(one, other []byte) (xor []byte) { + for i := 0; i < len(one); i++ { + xor[i] = one[i] ^ other[i] + } + return +} + +func (self *Cademlia) commonPrefixLength(other []byte) (ret int) { + xor := Xor(self.hash, other) + for i := 0; i < self.hashBits; i++ { + for j := 0; j < 8; j++ { + if (xor[i]>>uint8(7-j))&0x1 != 0 { + return i*8 + j + } + } + } + return self.hashBits*8 - 1 +} From 04bb359ba835a5589f42bd0eaf1f99c395bc716e Mon Sep 17 00:00:00 2001 From: zelig Date: Tue, 6 Jan 2015 18:15:52 +0000 Subject: [PATCH 04/17] todo --- p2p/TODO | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/p2p/TODO b/p2p/TODO index cd5891b7669e..20ede98299ea 100644 --- a/p2p/TODO +++ b/p2p/TODO @@ -1 +1,8 @@ -Change protocol getPeers message response conditional on target. peerList if noTarget else server.getPeers + + +- make sure lastActive is saved at disconnect +- put peer selection loop for inbound connections into verifyPeer +- use peerError in peer selector + + + From 3d41559fda0ed39207dc471d7cc08e3389272785 Mon Sep 17 00:00:00 2001 From: zelig Date: Tue, 6 Jan 2015 18:16:44 +0000 Subject: [PATCH 05/17] adjust peer selector interface - peerInfo interface has Disconnect(Reason) func only - peerSelector.AddPeer returns boolean only (for acceptance); all connection is managed by the server - peerSelector.GetPeers returns peer objects remove cademlia implementation into a different file --- p2p/peer_selector.go | 188 ++----------------------------------------- 1 file changed, 7 insertions(+), 181 deletions(-) diff --git a/p2p/peer_selector.go b/p2p/peer_selector.go index 81c644ad408c..dc4f953abcf4 100644 --- a/p2p/peer_selector.go +++ b/p2p/peer_selector.go @@ -2,9 +2,7 @@ package p2p import ( "encoding/json" - "fmt" "path" - "sync" "time" "github.com/ethereum/go-ethereum/ethutil" @@ -13,30 +11,28 @@ import ( type peerInfo interface { Addr() *peerAddr Hash() []byte - // Pubkey() []byte LastActive() time.Time - Disconnect() error - Connect() error + Disconnect(DiscReason) } type peerSelector interface { - AddPeer(peer peerInfo) error - GetPeers(target ...[]byte) []*peerAddr + AddPeer(peer peerInfo) bool + GetPeers(target ...[]byte) []peerInfo Start() error Stop() error } type BaseSelector struct { DirPath string - getPeers func() []*peerAddr + getPeers func() []peerInfo peers []peerInfo } -func (self *BaseSelector) AddPeer(peer peerInfo) error { - return nil +func (self *BaseSelector) AddPeer(peer peerInfo) bool { + return true } -func (self *BaseSelector) GetPeers(target ...[]byte) []*peerAddr { +func (self *BaseSelector) GetPeers(target ...[]byte) []peerInfo { return self.getPeers() } @@ -81,173 +77,3 @@ func ReadPeers(path string) (peers []peerInfo, err error) { } return } - -const ( - hashBits = 160 - rowLength = 10 - maxAge = 1 -) - -type Cademlia struct { - hash []byte - hashBits int - rowLength int - // index map[string]peerInfo - rows [hashBits]*row - - depth int - - maxAge time.Duration - purgeInterval time.Duration - - lock sync.RWMutex - quitC chan bool -} - -func newCademlia(hash []byte) *Cademlia { - return &Cademlia{ - hash: hash, - hashBits: hashBits, - rowLength: rowLength, - maxAge: maxAge * time.Second, - rows: [hashBits]*row{}, - // index: make(map[string]peerInfo), - } -} - -func (self *Cademlia) Start() error { - self.lock.Lock() - defer self.lock.Unlock() - if self.quitC != nil { - return nil - } - self.quitC = make(chan bool) - go self.purgeLoop() - return nil -} - -func (self *Cademlia) Stop() { - self.lock.Lock() - defer self.lock.Unlock() - if self.quitC == nil { - return - } - close(self.quitC) - self.quitC = nil -} - -func (self *Cademlia) AddPeer(peer peerInfo) (err error) { - index := self.commonPrefixLength(peer.Hash()) - row := self.rows[index] - needed := row.insert(&entry{peer: peer}) - if needed { - if index >= self.depth { - self.updateDepth() - } - } else { - err = fmt.Errorf("no worse peer found") - } - return -} - -func (self *Cademlia) GetPeers(target []byte) (peers []*peerAddr) { - index := self.commonPrefixLength(target) - var entries []*entry - if index >= self.depth { - for i := self.depth; i < self.hashBits; i++ { - entries = append(entries, self.rows[i].row...) - } - } else { - entries = self.rows[index].row - } - - for _, entry := range entries { - peers = append(peers, entry.peer.Addr()) - } - return -} - -type entry struct { - peer peerInfo - // metadata -} - -type row struct { - length int - row []*entry - lock sync.RWMutex -} - -func (self *row) insert(entry *entry) (ok bool) { - self.lock.Lock() - defer self.lock.Unlock() - if len(self.row) >= self.length { - self.row[self.worst()] = entry - } else { - self.row = append(self.row, entry) - ok = true - } - return -} - -func (self *row) worst() (index int) { - var oldest time.Time - for i, entry := range self.row { - if (oldest == time.Time{}) || entry.peer.LastActive().Before(oldest) { - oldest = entry.peer.LastActive() - index = i - } - } - return -} - -func (self *row) purge(recently time.Time) { - var newRow []*entry - for _, entry := range self.row { - if !entry.peer.LastActive().Before(recently) { - newRow = append(newRow, entry) - } else { - entry.peer.Disconnect() - } - } -} - -func Hash(key []byte) []byte { - return key -} - -func (self *Cademlia) updateDepth() { -} - -func (self *Cademlia) purgeLoop() { - ticker := time.Tick(self.purgeInterval) - for { - select { - case <-self.quitC: - return - case <-ticker: - for _, r := range self.rows { - r.purge(time.Now().Add(-self.maxAge)) - } - } - } -} - -func Xor(one, other []byte) (xor []byte) { - for i := 0; i < len(one); i++ { - xor[i] = one[i] ^ other[i] - } - return -} - -func (self *Cademlia) commonPrefixLength(other []byte) (ret int) { - xor := Xor(self.hash, other) - for i := 0; i < self.hashBits; i++ { - for j := 0; j < 8; j++ { - if (xor[i]>>uint8(7-j))&0x1 != 0 { - return i*8 + j - } - } - } - return self.hashBits*8 - 1 -} From 72deabeb422b4ffacfa79c12d5baf2251dd3e74c Mon Sep 17 00:00:00 2001 From: zelig Date: Tue, 6 Jan 2015 18:20:07 +0000 Subject: [PATCH 06/17] simplify server logic - get rid of dialLoop entirely - get rid of peerConnect channel in favour of simple function call - server handles the peerSelector hook for prospective outbound peers - no need for NoDial anymore really - changed how peer is initialised --- p2p/server.go | 160 +++++++++++++++++++------------------------------- 1 file changed, 59 insertions(+), 101 deletions(-) diff --git a/p2p/server.go b/p2p/server.go index dff4738ad518..ec09647011bb 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -67,7 +67,7 @@ type Server struct { // Hook for testing. This is useful because we can inhibit // the whole protocol stack. - newPeerFunc peerFunc + // newPeerFunc peerFunc lock sync.RWMutex running bool @@ -79,7 +79,6 @@ type Server struct { quit chan struct{} wg sync.WaitGroup - peerConnect chan *peerAddr peerDisconnect chan *Peer } @@ -95,7 +94,7 @@ type NAT interface { type peerFunc func(srv *Server, c net.Conn, dialAddr *peerAddr) *Peer -// Peers returns all connected peers. +// Peers returns all currently connected peers. func (srv *Server) Peers() (peers []*Peer) { srv.lock.RLock() defer srv.lock.RUnlock() @@ -107,33 +106,18 @@ func (srv *Server) Peers() (peers []*Peer) { return } -// GetActivePeers returns addresses of all connected peers. -func (srv *Server) GetActivePeers() (peers []*peerAddr) { - for _, peer := range srv.Peers() { - if peer != nil { - peer.infolock.Lock() - addr := peer.listenAddr - peer.infolock.Unlock() - // filter out this peer and peers that are not listening or - // have not completed the handshake. - // TODO: track previously sent peers and exclude them as well. - if addr == nil { - continue - } - peers = append(peers, addr) - } - } - return peers -} - // GetPeers returns addresses near target if given or supported by the client // or falls back to all actively connected peers -func (srv *Server) GetPeers(target ...[]byte) (peers []*peerAddr) { +func (srv *Server) GetPeers(target ...[]byte) []*peerAddr { if len(target) == 1 { // delegate to selector - srv.PeerSelector.GetPeers(target[0]) - return peers + return ActiveAddresses(srv.PeerSelector.GetPeers(target[0])...) } else { - return srv.GetActivePeers() + // in fact it is not clear why the selector would not want to reply to this case as well + var peers []peerInfo + for _, peer := range srv.Peers() { + peers = append(peers, peer) + } + return ActiveAddresses(peers...) } } @@ -152,7 +136,7 @@ func (srv *Server) SuggestPeer(addr string, pubkey []byte) error { srvlog.Errorf("couldn't resolve %s:", addr, err) return err } - peerAddr := &peerAddr{IP: netaddr.IP, Port: uint64(netaddr.Port), Pubkey: pubkey} + peerAddr := &peerAddr{netaddr.IP, uint64(netaddr.Port), pubkey} return srv.AddPeer(peerAddr) } @@ -161,13 +145,41 @@ func (srv *Server) SuggestPeer(addr string, pubkey []byte) error { // to decide if it is a worthwhile connection func (srv *Server) AddPeer(addr *peerAddr) (err error) { // need to look up nodeID first - peer := &peerRecord{addr: addr} - if err = srv.PeerSelector.AddPeer(peer); err == nil { - srvlog.Infof("peer %v accepted by peer selection", peer) + peer := &Peer{ + dialAddr: addr, + lastActiveC: make(chan time.Time), + lastActive: time.Now().Add(-24 * time.Hour), + } + if srv.PeerSelector.AddPeer(peer) { + srvlog.Infof("peer %v accepted by peer selection", addr) + err = srv.dialPeer(peer) } else { - srvlog.Infof("peer %v rejected by peer selection", peer) + srvlog.Infof("peer %v rejected by peer selection", addr) + } + return +} + +func (srv *Server) dialPeer(peer *Peer) (err error) { + timeout := time.After(5 * time.Second) + select { + case <-timeout: + err = fmt.Errorf("Too many connections. No slot available") + case slot := <-srv.peerSlots: // there is a slot available + srvlog.Debugf("Dialing %v (slot %d)\n", peer.dialAddr, slot) + conn, dialErr := srv.Dialer.Dial(peer.dialAddr.Network(), peer.dialAddr.String()) + if dialErr != nil { + err = fmt.Errorf("Dial error: %v", dialErr) + srvlog.Errorln(err) + srv.peerSlots <- slot + return + } + peer.slot = slot + peer.connect(srv, conn) + srv.wg.Add(1) + go srv.addPeer(peer) } return + } // Broadcast sends an RLP-encoded message to all connected peers. @@ -211,11 +223,10 @@ func (srv *Server) Start() (err error) { srv.quit = make(chan struct{}) srv.peers = make([]*Peer, srv.MaxPeers) srv.peerSlots = make(chan int, srv.MaxPeers) - srv.peerConnect = make(chan *peerAddr, outboundAddressPoolSize) srv.peerDisconnect = make(chan *Peer) - if srv.newPeerFunc == nil { - srv.newPeerFunc = newServerPeer - } + // if srv.newPeerFunc == nil { + // srv.newPeerFunc = newServerPeer + // } if srv.Blacklist == nil { srv.Blacklist = NewBlacklist() } @@ -228,10 +239,10 @@ func (srv *Server) Start() (err error) { return err } } - if !srv.NoDial { - srv.wg.Add(1) - go srv.dialLoop() - } + // if !srv.NoDial { + // srv.wg.Add(1) + // go srv.dialLoop() + // } if srv.NoDial && srv.ListenAddr == "" { srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.") } @@ -319,8 +330,10 @@ func (srv *Server) listenLoop() { srv.peerSlots <- slot return } - srvlog.Debugf("Accepted conn %v (slot %d)\n", conn.RemoteAddr(), slot) - srv.addPeer(conn, nil, slot) + srvlog.Debugf("Accepted conn %v (slot %d) - peer selector check after handshake", conn.RemoteAddr(), slot) + peer := &Peer{slot: slot} + peer.connect(srv, conn) + srv.addPeer(peer) case <-srv.quit: return } @@ -366,77 +379,22 @@ func (srv *Server) removePortMapping(port int) { srv.NAT.DeletePortMapping("tcp", port, port) } -func (srv *Server) dialLoop() { - defer srv.wg.Done() - var ( - suggest chan *peerAddr - slot *int - slots = srv.peerSlots - ) - for { - select { - case i := <-slots: - // we need a peer in slot i, slot reserved - slot = &i - // now we can watch for candidate peers in the next loop - suggest = srv.peerConnect - // do not consume more until candidate peer is found - slots = nil - - case desc := <-suggest: - // candidate peer found, will dial out asyncronously - // if connection fails slot will be released - srvlog.Infof("dial %v (%v)", desc, *slot) - go srv.dialPeer(desc, *slot) - // we can watch if more peers needed in the next loop - slots = srv.peerSlots - // until then we dont care about candidate peers - suggest = nil - - case <-srv.quit: - // give back the currently reserved slot - if slot != nil { - srv.peerSlots <- *slot - } - return - } - } -} - -// connect to peer via dial out -func (srv *Server) dialPeer(desc *peerAddr, slot int) { - srvlog.Debugf("Dialing %v (slot %d)\n", desc, slot) - conn, err := srv.Dialer.Dial(desc.Network(), desc.String()) - if err != nil { - srvlog.Errorf("Dial error: %v", err) - srv.peerSlots <- slot - return - } - go srv.addPeer(conn, desc, slot) -} - // creates the new peer object and inserts it into its slot -func (srv *Server) addPeer(conn net.Conn, desc *peerAddr, slot int) *Peer { +func (srv *Server) addPeer(peer *Peer) { srv.lock.Lock() defer srv.lock.Unlock() - if !srv.running { - conn.Close() - srv.peerSlots <- slot // release slot - return nil - } - peer := srv.newPeerFunc(srv, conn, desc) - peer.slot = slot - srv.peers[slot] = peer + srvlog.Debugf("Add peer %v (slot %v)\n", peer, peer.slot) + srv.peers[peer.slot] = peer srv.peerCount++ go func() { peer.loop(); srv.peerDisconnect <- peer }() - return peer + return } // removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot func (srv *Server) removePeer(peer *Peer) { srv.lock.Lock() defer srv.lock.Unlock() - srvlog.Debugf("Removing %v (slot %v)\n", peer, peer.slot) + srvlog.Debugf("Remove peer %v (slot %v)\n", peer, peer.slot) if srv.peers[peer.slot] != peer { srvlog.Warnln("Invalid peer to remove:", peer) return From b2bcb3210742eb3c7c321f999efaf0e9c2bb74fc Mon Sep 17 00:00:00 2001 From: zelig Date: Tue, 6 Jan 2015 18:23:16 +0000 Subject: [PATCH 07/17] Peer now implements the peerInfo interface - changed peer initialisation (now peer can be just a record) - peer.Addr (always listening address) - peer.Hash() memoized hash of the address (pubkey from dialaddr or listenaddr) - LastActive updates with channel - ActiveAddresses convenience filter method returns addresses of all connected peers --- p2p/peer.go | 137 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 78 insertions(+), 59 deletions(-) diff --git a/p2p/peer.go b/p2p/peer.go index de21f60dd967..facdaacb1c10 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -49,53 +49,16 @@ func (d peerAddr) RlpData() interface{} { return []interface{}{d.IP, d.Port, d.Pubkey} } -type peerRecord struct { - addr *peerAddr - hash []byte - lastActive time.Time - lastActiveC chan time.Time - peer *Peer -} - -func (self *peerRecord) Addr() *peerAddr { - return self.addr -} - -func (self *peerRecord) Hash() []byte { - if self.hash == nil { - self.hash = Hash(self.addr.Pubkey) - } - return self.hash -} - -func (self *peerRecord) LastActive() (lastActive time.Time) { - var ok bool - select { - case lastActive, ok = <-self.lastActiveC: - if ok { - self.lastActive = lastActive - } - default: - lastActive = self.lastActive - } - return -} - -func (self *peerRecord) Connect() error { - return nil -} - -func (self *peerRecord) Disconnect() error { - return nil -} - // Peer represents a remote peer. type Peer struct { // Peers have all the log methods. // Use them to display messages related to the peer. *logger.Logger - infolock sync.Mutex + lastActive time.Time // updated persisted + lastActiveC chan time.Time // for constant querying + + infolock sync.RWMutex identity ClientIdentity caps []Cap listenAddr *peerAddr // what remote peer is listening on @@ -125,6 +88,7 @@ type Peer struct { // These fields are kept so base protocol can access them. // TODO: this should be one or more interfaces + hash []byte // hash of pubkey used as address ourID ClientIdentity // client id of the Server ourListenAddr *peerAddr // listen addr of Server, nil if not listening addPeer func(*peerAddr) error // tell server about received peers @@ -135,41 +99,96 @@ type Peer struct { // NewPeer returns a peer for testing purposes. func NewPeer(id ClientIdentity, caps []Cap) *Peer { conn, _ := net.Pipe() - peer := newPeer(conn, nil, nil) + peer := &Peer{} + peer.init(conn) peer.setHandshakeInfo(id, nil, caps) close(peer.closed) return peer } -func newServerPeer(server *Server, conn net.Conn, dialAddr *peerAddr) *Peer { - p := newPeer(conn, server.Protocols, dialAddr) +func (self *Peer) init(conn net.Conn) { + self.conn = conn + self.Logger = logger.NewLogger("P2P " + conn.RemoteAddr().String()) + self.bufconn = bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + self.running = make(map[string]*proto) + self.disc = make(chan DiscReason) + self.protoErr = make(chan error) + self.closed = make(chan struct{}) +} + +func (p *Peer) connect(server *Server, conn net.Conn) { + p.init(conn) p.ourID = server.Identity p.addPeer = server.AddPeer p.getPeers = server.GetPeers p.pubkeyHook = server.verifyPeer p.runBaseProtocol = true + p.protocols = server.Protocols // laddr can be updated concurrently by NAT traversal. // newServerPeer must be called with the server lock held. if server.laddr != nil { p.ourListenAddr = newPeerAddr(server.laddr, server.Identity.Pubkey()) } - return p -} - -func newPeer(conn net.Conn, protocols []Protocol, dialAddr *peerAddr) *Peer { - p := &Peer{ - Logger: logger.NewLogger("P2P " + conn.RemoteAddr().String()), - conn: conn, - dialAddr: dialAddr, - bufconn: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)), - protocols: protocols, - running: make(map[string]*proto), - disc: make(chan DiscReason), - protoErr: make(chan error), - closed: make(chan struct{}), +} + +// ActiveAddresses returns addresses of all connected peers. +// Actually if the peer selector keeps historical info , then once active peers +// will be included too. +func ActiveAddresses(peers ...peerInfo) (addrs []*peerAddr) { + for _, peer := range peers { + if peer != nil { + addr := peer.Addr() + // filter out peers that are not listening or + // have not completed the handshake. + // the peer selector can track previously sent peers and exclude them as well. + if addr == nil { + continue + } + addrs = append(addrs, addr) + } + } + return addrs +} + +// implements the peerInfo interface +func (self *Peer) Addr() *peerAddr { + self.infolock.RLock() + defer self.infolock.RUnlock() + return self.listenAddr +} + +func (self *Peer) Hash() []byte { + if self.hash == nil { + self.hash = Hash(self.Pubkey()) + } + return self.hash +} + +func (self *Peer) Pubkey() (pubkey []byte) { + self.infolock.Lock() + defer self.infolock.Unlock() + if self.dialAddr != nil { + pubkey = self.dialAddr.Pubkey + } else { + if self.listenAddr != nil { + pubkey = self.listenAddr.Pubkey + } + } + return +} + +func (self *Peer) LastActive() (lastActive time.Time) { + var ok bool + select { + case lastActive, ok = <-self.lastActiveC: + if ok { + self.lastActive = lastActive + } + default: + lastActive = self.lastActive } - return p + return } // Identity returns the client identity of the remote peer. The From 34c7c6b767004a7cd9ff73b5d82c7ca9a55de535 Mon Sep 17 00:00:00 2001 From: zelig Date: Wed, 7 Jan 2015 18:07:30 +0000 Subject: [PATCH 08/17] variable name left from earlier code --- p2p/protocol_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/protocol_test.go b/p2p/protocol_test.go index 4ea38edc28c7..052f1d10a7fb 100644 --- a/p2p/protocol_test.go +++ b/p2p/protocol_test.go @@ -81,7 +81,7 @@ func TestBaseProtocolPeers(t *testing.T) { peer1.ourListenAddr = listenAddr peer1.getPeers = func(...[]byte) []*peerAddr { pl := make([]*peerAddr, len(peerList)) - for i, addr := range cannedPeerList { + for i, addr := range peerList { pl[i] = addr } return pl From 211ee7e4f3188fa9ad0e5e2de0425be92e2fd69d Mon Sep 17 00:00:00 2001 From: zelig Date: Wed, 7 Jan 2015 18:08:10 +0000 Subject: [PATCH 09/17] tests pass - connectFunc on server, by default it calles srv.connect(peer, conn) - srv.connect(peer, conn) is the init code from peer - this still provides a nice hook for testing - adapt server and peer tests --- p2p/peer.go | 16 ------------- p2p/peer_test.go | 4 +++- p2p/server.go | 31 ++++++++++++++++++++---- p2p/server_test.go | 59 +++++++++++++++++++++++----------------------- 4 files changed, 59 insertions(+), 51 deletions(-) diff --git a/p2p/peer.go b/p2p/peer.go index facdaacb1c10..a14a7f589e52 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -116,22 +116,6 @@ func (self *Peer) init(conn net.Conn) { self.closed = make(chan struct{}) } -func (p *Peer) connect(server *Server, conn net.Conn) { - p.init(conn) - p.ourID = server.Identity - p.addPeer = server.AddPeer - p.getPeers = server.GetPeers - p.pubkeyHook = server.verifyPeer - p.runBaseProtocol = true - p.protocols = server.Protocols - - // laddr can be updated concurrently by NAT traversal. - // newServerPeer must be called with the server lock held. - if server.laddr != nil { - p.ourListenAddr = newPeerAddr(server.laddr, server.Identity.Pubkey()) - } -} - // ActiveAddresses returns addresses of all connected peers. // Actually if the peer selector keeps historical info , then once active peers // will be included too. diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 4ee88f112b0f..8c04c137d76b 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -30,7 +30,9 @@ var discard = Protocol{ func testPeer(protos []Protocol) (net.Conn, *Peer, <-chan error) { conn1, conn2 := net.Pipe() - peer := newPeer(conn1, protos, nil) + peer := &Peer{} + peer.init(conn1) + peer.protocols = protos peer.ourID = &peerId{} peer.pubkeyHook = func(*peerAddr) error { return nil } errc := make(chan error, 1) diff --git a/p2p/server.go b/p2p/server.go index ec09647011bb..a9d1d3989b79 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -77,6 +77,8 @@ type Server struct { peerSlots chan int peerCount int + connectFunc func(*Peer, net.Conn) + quit chan struct{} wg sync.WaitGroup peerDisconnect chan *Peer @@ -92,8 +94,6 @@ type NAT interface { String() string } -type peerFunc func(srv *Server, c net.Conn, dialAddr *peerAddr) *Peer - // Peers returns all currently connected peers. func (srv *Server) Peers() (peers []*Peer) { srv.lock.RLock() @@ -145,6 +145,8 @@ func (srv *Server) SuggestPeer(addr string, pubkey []byte) error { // to decide if it is a worthwhile connection func (srv *Server) AddPeer(addr *peerAddr) (err error) { // need to look up nodeID first + srvlog.Infof("checking peer %v", addr) + peer := &Peer{ dialAddr: addr, lastActiveC: make(chan time.Time), @@ -173,15 +175,30 @@ func (srv *Server) dialPeer(peer *Peer) (err error) { srv.peerSlots <- slot return } + srvlog.Debugf("Connected to %v (slot %d)\n", peer.dialAddr, slot) peer.slot = slot - peer.connect(srv, conn) - srv.wg.Add(1) + srv.connectFunc(peer, conn) go srv.addPeer(peer) } return } +func (srv *Server) connect(p *Peer, conn net.Conn) { + p.init(conn) + p.ourID = srv.Identity + p.addPeer = srv.AddPeer + p.getPeers = srv.GetPeers + p.pubkeyHook = srv.verifyPeer + p.runBaseProtocol = true + p.protocols = srv.Protocols + + // laddr can be updated concurrently by NAT traversal. + if srv.laddr != nil { + p.ourListenAddr = newPeerAddr(srv.laddr, srv.Identity.Pubkey()) + } +} + // Broadcast sends an RLP-encoded message to all connected peers. // This method is deprecated and will be removed later. func (srv *Server) Broadcast(protocol string, code uint64, data ...interface{}) { @@ -251,6 +268,10 @@ func (srv *Server) Start() (err error) { srv.PeerSelector = &BaseSelector{} } + if srv.connectFunc == nil { + srv.connectFunc = srv.connect + } + // make all slots available for i := range srv.peers { srv.peerSlots <- i @@ -332,7 +353,7 @@ func (srv *Server) listenLoop() { } srvlog.Debugf("Accepted conn %v (slot %d) - peer selector check after handshake", conn.RemoteAddr(), slot) peer := &Peer{slot: slot} - peer.connect(srv, conn) + srv.connectFunc(peer, conn) srv.addPeer(peer) case <-srv.quit: return diff --git a/p2p/server_test.go b/p2p/server_test.go index ceb89e3f7fb7..2545aecfc8c6 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -9,12 +9,12 @@ import ( "time" ) -func startTestServer(t *testing.T, pf peerFunc) *Server { +func startTestServer(t *testing.T, cb func(*Peer, net.Conn)) *Server { server := &Server{ Identity: &peerId{}, MaxPeers: 10, ListenAddr: "127.0.0.1:0", - newPeerFunc: pf, + connectFunc: cb, } if err := server.Start(); err != nil { t.Fatalf("Could not start server: %v", err) @@ -27,16 +27,9 @@ func TestServerListen(t *testing.T) { // start the test server connected := make(chan *Peer) - srv := startTestServer(t, func(srv *Server, conn net.Conn, dialAddr *peerAddr) *Peer { - if conn == nil { - t.Error("peer func called with nil conn") - } - if dialAddr != nil { - t.Error("peer func called with non-nil dialAddr") - } - peer := newPeer(conn, nil, dialAddr) + srv := startTestServer(t, func(peer *Peer, conn net.Conn) { + peer.init(conn) connected <- peer - return peer }) defer close(connected) defer srv.Stop() @@ -50,10 +43,18 @@ func TestServerListen(t *testing.T) { select { case peer := <-connected: - if peer.conn.LocalAddr().String() != conn.RemoteAddr().String() { - t.Errorf("peer started with wrong conn: got %v, want %v", - peer.conn.LocalAddr(), conn.RemoteAddr()) + if peer.conn == nil { + t.Error("peer setup with nil conn") + } else { + if peer.conn.LocalAddr().String() != conn.RemoteAddr().String() { + t.Errorf("peer started with wrong conn: got %v, want %v", + peer.conn.LocalAddr(), conn.RemoteAddr()) + } + } + if peer.dialAddr != nil { + t.Error("peer setup with non-nil dialAddr") } + case <-time.After(1 * time.Second): t.Error("server did not accept within one second") } @@ -72,7 +73,7 @@ func TestServerDial(t *testing.T) { go func() { conn, err := listener.Accept() if err != nil { - t.Error("acccept error:", err) + t.Error("accept error:", err) } conn.Close() accepted <- conn @@ -80,28 +81,28 @@ func TestServerDial(t *testing.T) { // start the test server connected := make(chan *Peer) - srv := startTestServer(t, func(srv *Server, conn net.Conn, dialAddr *peerAddr) *Peer { - if conn == nil { - t.Error("peer func called with nil conn") - } - peer := newPeer(conn, nil, dialAddr) - connected <- peer - return peer + srv := startTestServer(t, func(peer *Peer, conn net.Conn) { + peer.init(conn) + go func() { connected <- peer }() }) defer close(connected) defer srv.Stop() // tell the server to connect. connAddr := newPeerAddr(listener.Addr(), nil) - srv.peerConnect <- connAddr + srv.AddPeer(connAddr) select { case conn := <-accepted: select { case peer := <-connected: - if peer.conn.RemoteAddr().String() != conn.LocalAddr().String() { - t.Errorf("peer started with wrong conn: got %v, want %v", - peer.conn.RemoteAddr(), conn.LocalAddr()) + if conn == nil { + t.Error("peer func called with nil conn") + } else { + if peer.conn.RemoteAddr().String() != conn.LocalAddr().String() { + t.Errorf("peer started with wrong conn: got %v, want %v", + peer.conn.RemoteAddr(), conn.LocalAddr()) + } } if peer.dialAddr != connAddr { t.Errorf("peer started with wrong dialAddr: got %v, want %v", @@ -119,11 +120,11 @@ func TestServerDial(t *testing.T) { func TestServerBroadcast(t *testing.T) { defer testlog(t).detach() var connected sync.WaitGroup - srv := startTestServer(t, func(srv *Server, c net.Conn, dialAddr *peerAddr) *Peer { - peer := newPeer(c, []Protocol{discard}, dialAddr) + srv := startTestServer(t, func(peer *Peer, conn net.Conn) { + peer.init(conn) + peer.protocols = []Protocol{discard} peer.startSubprotocols([]Cap{discard.cap()}) connected.Done() - return peer }) defer srv.Stop() From caa651aaecf38f84f02731b7585dcc39552a7c22 Mon Sep 17 00:00:00 2001 From: zelig Date: Wed, 7 Jan 2015 18:35:10 +0000 Subject: [PATCH 10/17] Info log message for connected peers --- p2p/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/server.go b/p2p/server.go index a9d1d3989b79..4717b895d7fc 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -167,7 +167,7 @@ func (srv *Server) dialPeer(peer *Peer) (err error) { case <-timeout: err = fmt.Errorf("Too many connections. No slot available") case slot := <-srv.peerSlots: // there is a slot available - srvlog.Debugf("Dialing %v (slot %d)\n", peer.dialAddr, slot) + srvlog.Infof("Dialing %v (slot %d)\n", peer.dialAddr, slot) conn, dialErr := srv.Dialer.Dial(peer.dialAddr.Network(), peer.dialAddr.String()) if dialErr != nil { err = fmt.Errorf("Dial error: %v", dialErr) @@ -175,7 +175,7 @@ func (srv *Server) dialPeer(peer *Peer) (err error) { srv.peerSlots <- slot return } - srvlog.Debugf("Connected to %v (slot %d)\n", peer.dialAddr, slot) + srvlog.Infof("Connected to %v (slot %d)\n", peer.dialAddr, slot) peer.slot = slot srv.connectFunc(peer, conn) go srv.addPeer(peer) From 13ca81544b8530dff8f9a541251928f0c43e6783 Mon Sep 17 00:00:00 2001 From: zelig Date: Wed, 7 Jan 2015 18:49:57 +0000 Subject: [PATCH 11/17] NoDial option now effective within AddPeer -> returns error --- p2p/server.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/p2p/server.go b/p2p/server.go index 4717b895d7fc..73583b70902a 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -60,6 +60,8 @@ type Server struct { Dialer *net.Dialer // If NoDial is true, the server will not dial any peers. + // this maybe used in test environments where we want to prevent a node from + // connecting (and synchronising) with other nodes NoDial bool // peer selector @@ -144,9 +146,10 @@ func (srv *Server) SuggestPeer(addr string, pubkey []byte) error { // If not found among connected peers turns to the peerSelector // to decide if it is a worthwhile connection func (srv *Server) AddPeer(addr *peerAddr) (err error) { + if srv.NoDial { + return fmt.Errorln("no dial out") + } // need to look up nodeID first - srvlog.Infof("checking peer %v", addr) - peer := &Peer{ dialAddr: addr, lastActiveC: make(chan time.Time), @@ -256,13 +259,6 @@ func (srv *Server) Start() (err error) { return err } } - // if !srv.NoDial { - // srv.wg.Add(1) - // go srv.dialLoop() - // } - if srv.NoDial && srv.ListenAddr == "" { - srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.") - } if srv.PeerSelector == nil { srv.PeerSelector = &BaseSelector{} From c8aa9f1e75c22f64444e98b78eb1bdd18ebb0a53 Mon Sep 17 00:00:00 2001 From: zelig Date: Thu, 8 Jan 2015 11:35:55 +0000 Subject: [PATCH 12/17] peer error introduces more differentiation and verify func is called on peer - keep missing, mismatch and invalid pubkey errors to errPubkeyInvalid -> DiscInvalidIdentity - introduce errBlacklistedPeer, errSelfConnection, errConnectedPeer, errRejectedPeer -> DiscUselessPeer - peer.pubkeyHook -> peer.verifyPeerHook - peer.verifyPeerHook called on peer by protocol after handshake info set - peer.Pubkey looks into identity.Pubkey too (redundant though) --- p2p/peer.go | 23 ++++++++++++----------- p2p/peer_error.go | 16 ++++++++++++---- p2p/peer_test.go | 2 +- p2p/protocol.go | 9 ++++----- p2p/protocol_test.go | 4 ++-- p2p/server.go | 17 +++++++++-------- 6 files changed, 40 insertions(+), 31 deletions(-) diff --git a/p2p/peer.go b/p2p/peer.go index a14a7f589e52..87aaa10afe64 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -88,12 +88,12 @@ type Peer struct { // These fields are kept so base protocol can access them. // TODO: this should be one or more interfaces - hash []byte // hash of pubkey used as address - ourID ClientIdentity // client id of the Server - ourListenAddr *peerAddr // listen addr of Server, nil if not listening - addPeer func(*peerAddr) error // tell server about received peers - getPeers func(...[]byte) []*peerAddr // should return the list of all peers - pubkeyHook func(*peerAddr) error // called at end of handshake to validate pubkey + hash []byte // hash of pubkey used as address + ourID ClientIdentity // client id of the Server + ourListenAddr *peerAddr // listen addr of Server, nil if not listening + addPeer func(*peerAddr) error // tell server about received peers + getPeers func(...[]byte) []*peerAddr // should return the list of all peers + verifyPeerHook func(*Peer) error // called at end of handshake to validate peer } // NewPeer returns a peer for testing purposes. @@ -152,12 +152,13 @@ func (self *Peer) Hash() []byte { func (self *Peer) Pubkey() (pubkey []byte) { self.infolock.Lock() defer self.infolock.Unlock() - if self.dialAddr != nil { + switch { + case self.identity != nil: + pubkey = self.identity.Pubkey() + case self.dialAddr != nil: pubkey = self.dialAddr.Pubkey - } else { - if self.listenAddr != nil { - pubkey = self.listenAddr.Pubkey - } + case self.listenAddr != nil: + pubkey = self.listenAddr.Pubkey } return } diff --git a/p2p/peer_error.go b/p2p/peer_error.go index 0eb7ec838d45..73587ddca408 100644 --- a/p2p/peer_error.go +++ b/p2p/peer_error.go @@ -14,7 +14,11 @@ const ( errP2PVersionMismatch errPubkeyMissing errPubkeyInvalid - errPubkeyForbidden + errPubkeyMismatch + errBlacklistedPeer + errSelfConnection + errConnectedPeer + errRejectedPeer errProtocolBreach errPingTimeout errInvalidNetworkId @@ -31,7 +35,11 @@ var errorToString = map[int]string{ errP2PVersionMismatch: "P2P Version Mismatch", errPubkeyMissing: "Public key missing", errPubkeyInvalid: "Public key invalid", - errPubkeyForbidden: "Public key forbidden", + errPubkeyMismatch: "Public key mismatch", + errBlacklistedPeer: "Blacklisted peer", + errSelfConnection: "Self connection", + errConnectedPeer: "Connected peer", + errRejectedPeer: "Rejected peer", errProtocolBreach: "Protocol Breach", errPingTimeout: "Ping timeout", errInvalidNetworkId: "Invalid network id", @@ -117,9 +125,9 @@ func discReasonForError(err error) DiscReason { switch peerError.Code { case errP2PVersionMismatch: return DiscIncompatibleVersion - case errPubkeyMissing, errPubkeyInvalid: + case errPubkeyMissing, errPubkeyMismatch, errPubkeyInvalid: return DiscInvalidIdentity - case errPubkeyForbidden: + case errBlacklistedPeer, errSelfConnection, errConnectedPeer, errRejectedPeer: return DiscUselessPeer case errInvalidMsgCode, errMagicTokenMismatch, errProtocolBreach: return DiscProtocolError diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 8c04c137d76b..c8ccf250208d 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -34,7 +34,7 @@ func testPeer(protos []Protocol) (net.Conn, *Peer, <-chan error) { peer.init(conn1) peer.protocols = protos peer.ourID = &peerId{} - peer.pubkeyHook = func(*peerAddr) error { return nil } + peer.verifyPeerHook = func(*Peer) error { return nil } errc := make(chan error, 1) go func() { _, err := peer.loop() diff --git a/p2p/protocol.go b/p2p/protocol.go index 7aabe2860d3f..908cdc816172 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -249,13 +249,9 @@ func (bp *baseProtocol) readHandshake() error { // verify that the peer we wanted to connect to // actually holds the target public key. if da.Pubkey != nil && !bytes.Equal(da.Pubkey, hs.NodeID) { - return newPeerError(errPubkeyForbidden, "dial address pubkey mismatch") + return newPeerError(errPubkeyMismatch, "dial address pubkey mismatch: %x vs %x", da.Pubkey, hs.NodeID) } } - pa := newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID) - if err := bp.peer.pubkeyHook(pa); err != nil { - return newPeerError(errPubkeyForbidden, "%v", err) - } // TODO: remove Caps with empty name var addr *peerAddr if hs.ListenPort != 0 { @@ -263,6 +259,9 @@ func (bp *baseProtocol) readHandshake() error { addr.Port = hs.ListenPort } bp.peer.setHandshakeInfo(&hs, addr, hs.Caps) + if err := bp.peer.verifyPeerHook(bp.peer); err != nil { + return err + } bp.peer.startSubprotocols(hs.Caps) return nil } diff --git a/p2p/protocol_test.go b/p2p/protocol_test.go index 052f1d10a7fb..6a71e30adda7 100644 --- a/p2p/protocol_test.go +++ b/p2p/protocol_test.go @@ -29,7 +29,7 @@ func (self *peerId) Pubkey() (pubkey []byte) { func newTestPeer() (peer *Peer) { peer = NewPeer(&peerId{}, []Cap{}) - peer.pubkeyHook = func(*peerAddr) error { return nil } + peer.verifyPeerHook = func(*Peer) error { return nil } peer.ourID = &peerId{} peer.listenAddr = &peerAddr{} peer.getPeers = func(...[]byte) []*peerAddr { return nil } @@ -110,7 +110,7 @@ func TestBaseProtocolPeers(t *testing.T) { func TestBaseProtocolDisconnect(t *testing.T) { peer := NewPeer(&peerId{}, nil) peer.ourID = &peerId{} - peer.pubkeyHook = func(*peerAddr) error { return nil } + peer.verifyPeerHook = func(*Peer) error { return nil } rw1, rw2 := MsgPipe() done := make(chan struct{}) diff --git a/p2p/server.go b/p2p/server.go index a9d1d3989b79..35dca03c1dbd 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -189,7 +189,7 @@ func (srv *Server) connect(p *Peer, conn net.Conn) { p.ourID = srv.Identity p.addPeer = srv.AddPeer p.getPeers = srv.GetPeers - p.pubkeyHook = srv.verifyPeer + p.verifyPeerHook = srv.verifyPeer p.runBaseProtocol = true p.protocols = srv.Protocols @@ -427,20 +427,21 @@ func (srv *Server) removePeer(peer *Peer) { srv.peerSlots <- peer.slot } -func (srv *Server) verifyPeer(addr *peerAddr) error { - if srv.Blacklist.Exists(addr.Pubkey) { - return errors.New("blacklisted") +func (srv *Server) verifyPeer(peer *Peer) error { + pubkey := peer.Pubkey() + if srv.Blacklist.Exists(pubkey) { + return newPeerError(errBlacklistedPeer, "") } - if bytes.Equal(srv.Identity.Pubkey()[1:], addr.Pubkey) { - return newPeerError(errPubkeyForbidden, "not allowed to connect to srv") + if bytes.Equal(srv.Identity.Pubkey()[1:], pubkey) { + return newPeerError(errSelfConnection, "") } srv.lock.RLock() defer srv.lock.RUnlock() for _, peer := range srv.peers { if peer != nil { id := peer.Identity() - if id != nil && bytes.Equal(id.Pubkey(), addr.Pubkey) { - return errors.New("already connected") + if id != nil && bytes.Equal(id.Pubkey(), pubkey) { + return newPeerError(errConnectedPeer, "") } } } From 44e9ebbf9197798d7584eb3cea74d09bd333fc65 Mon Sep 17 00:00:00 2001 From: zelig Date: Thu, 8 Jan 2015 11:46:14 +0000 Subject: [PATCH 13/17] peer selector now verifies inbound peers - after handshake when nodeID is known - part of verifyPeerHook - peerSelector.AddPeer now returns error --- p2p/cademlia.go | 8 +++++--- p2p/peer_selector.go | 6 +++--- p2p/server.go | 9 +++++++-- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/p2p/cademlia.go b/p2p/cademlia.go index d1567b74b030..a89023aa0270 100644 --- a/p2p/cademlia.go +++ b/p2p/cademlia.go @@ -1,6 +1,7 @@ package p2p import ( + "fmt" "sync" "time" @@ -61,17 +62,18 @@ func (self *Cademlia) Stop() { self.quitC = nil } -func (self *Cademlia) AddPeer(peer peerInfo) (needed bool) { +func (self *Cademlia) AddPeer(peer peerInfo) (err error) { index := self.commonPrefixLength(peer.Hash()) row := self.rows[index] - needed = row.insert(&entry{peer: peer}) + needed := row.insert(&entry{peer: peer}) if needed { if index >= self.depth { go self.updateDepth() } cadlogger.Infof("accept peer %x...", peer.Hash()[:8]) } else { - cadlogger.Infof("reject peer %x... no worse peer found", peer.Hash()[:8]) + err = fmt.Errorf("no worse peer found") + cadlogger.Infof("reject peer %x..: %v", peer.Hash()[:8], err) } return } diff --git a/p2p/peer_selector.go b/p2p/peer_selector.go index dc4f953abcf4..2c1b2260f8f0 100644 --- a/p2p/peer_selector.go +++ b/p2p/peer_selector.go @@ -16,7 +16,7 @@ type peerInfo interface { } type peerSelector interface { - AddPeer(peer peerInfo) bool + AddPeer(peer peerInfo) error GetPeers(target ...[]byte) []peerInfo Start() error Stop() error @@ -28,8 +28,8 @@ type BaseSelector struct { peers []peerInfo } -func (self *BaseSelector) AddPeer(peer peerInfo) bool { - return true +func (self *BaseSelector) AddPeer(peer peerInfo) error { + return nil } func (self *BaseSelector) GetPeers(target ...[]byte) []peerInfo { diff --git a/p2p/server.go b/p2p/server.go index 35dca03c1dbd..d8285ff24382 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -152,7 +152,7 @@ func (srv *Server) AddPeer(addr *peerAddr) (err error) { lastActiveC: make(chan time.Time), lastActive: time.Now().Add(-24 * time.Hour), } - if srv.PeerSelector.AddPeer(peer) { + if err = srv.PeerSelector.AddPeer(peer); err == nil { srvlog.Infof("peer %v accepted by peer selection", addr) err = srv.dialPeer(peer) } else { @@ -436,7 +436,6 @@ func (srv *Server) verifyPeer(peer *Peer) error { return newPeerError(errSelfConnection, "") } srv.lock.RLock() - defer srv.lock.RUnlock() for _, peer := range srv.peers { if peer != nil { id := peer.Identity() @@ -445,6 +444,12 @@ func (srv *Server) verifyPeer(peer *Peer) error { } } } + srv.lock.RUnlock() + if peer.dialAddr == nil { + if err := srv.PeerSelector.AddPeer(peer); err != nil { + return newPeerError(errRejectedPeer, "%v", err) + } + } return nil } From 15873f5e24cc89e776c3a4f09f399058fa88d6bc Mon Sep 17 00:00:00 2001 From: zelig Date: Fri, 9 Jan 2015 11:34:26 +0000 Subject: [PATCH 14/17] Errorf --- p2p/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/server.go b/p2p/server.go index 861fc401a8a0..819d38607e07 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -147,7 +147,7 @@ func (srv *Server) SuggestPeer(addr string, pubkey []byte) error { // to decide if it is a worthwhile connection func (srv *Server) AddPeer(addr *peerAddr) (err error) { if srv.NoDial { - return fmt.Errorln("no dial out") + return fmt.Errorf("no dial out") } // need to look up nodeID first peer := &Peer{ From 26c625d8e61cc18a16d2d38b13140faca0c55b85 Mon Sep 17 00:00:00 2001 From: zelig Date: Fri, 9 Jan 2015 12:15:31 +0000 Subject: [PATCH 15/17] cademlia improvements - parameter are Cademlia public fields - compulsory Hash, verifiable HashBits, Hash, RowLength, adjustable MaxAge, PurgeInterval, ProxSize - they default to package level constants and vars - row initialisation - adjustProx, updateProx to 'update depth' - proper global locking thanks to Prox - map from entries to peerInfo for GetPeers - call disconnect on the worst peer expunged when inserting - locking for purge - extract self.Distance <- self.DistanceTo (made public method) - call disconnect when purging --- p2p/cademlia.go | 140 +++++++++++++++++++++++++++++++++++------------- 1 file changed, 104 insertions(+), 36 deletions(-) diff --git a/p2p/cademlia.go b/p2p/cademlia.go index a89023aa0270..21f0c98a3d4e 100644 --- a/p2p/cademlia.go +++ b/p2p/cademlia.go @@ -13,31 +13,33 @@ var cadlogger = ethlogger.NewLogger("CAD") const ( hashBits = 160 rowLength = 10 - maxAge = 1 ) +var maxAge = 180 * time.Nanosecond +var purgeInterval = 300 * time.Second + type Cademlia struct { - hash []byte - hashBits int - rowLength int - rows [hashBits]*row + Hash []byte + HashBits int + RowLength int + + MaxProxSize int - depth int + MaxAge time.Duration + PurgeInterval time.Duration - maxAge time.Duration - purgeInterval time.Duration + proxLimit int + proxSize int + + rows []*row lock sync.RWMutex quitC chan bool } -func newCademlia(hash []byte) *Cademlia { +func NewCademlia(hash []byte) *Cademlia { return &Cademlia{ - hash: hash, - hashBits: hashBits, - rowLength: rowLength, - maxAge: maxAge * time.Second, - rows: [hashBits]*row{}, + Hash: hash, // compulsory fields without default } } @@ -47,6 +49,28 @@ func (self *Cademlia) Start() error { if self.quitC != nil { return nil } + // these + self.Hash can and should be checked against the + // saved file/db + if self.HashBits == 0 { + self.HashBits = hashBits + } + if self.RowLength == 0 { + self.RowLength = rowLength + } + // runtime parameters + if self.MaxProxSize == 0 { + self.MaxProxSize = self.RowLength + } + if self.MaxAge == time.Duration(0) { + self.MaxAge = maxAge + } + if self.PurgeInterval == time.Duration(0) { + self.PurgeInterval = purgeInterval + } + self.rows = make([]*row, self.HashBits) + for i, _ := range self.rows { + self.rows[i] = &row{} // will initialise row{int(0),[]*entry(nil),sync.Mutex} + } self.quitC = make(chan bool) go self.purgeLoop() return nil @@ -63,12 +87,14 @@ func (self *Cademlia) Stop() { } func (self *Cademlia) AddPeer(peer peerInfo) (err error) { - index := self.commonPrefixLength(peer.Hash()) + self.lock.Lock() + defer self.lock.Unlock() + index := self.DistanceTo(peer.Hash()) row := self.rows[index] - needed := row.insert(&entry{peer: peer}) - if needed { - if index >= self.depth { - go self.updateDepth() + added := row.insert(&entry{peer: peer}) + if added { + if index >= self.proxLimit { + go self.adjustProx(index, 1) } cadlogger.Infof("accept peer %x...", peer.Hash()[:8]) } else { @@ -78,16 +104,43 @@ func (self *Cademlia) AddPeer(peer peerInfo) (err error) { return } +func (self *Cademlia) adjustProx(r int, add int) { + if self.proxSize+add > self.MaxProxSize && + self.rows[r].len() > 0 { + self.proxLimit-- + } else { + self.proxSize += add + } +} + +func (self *Cademlia) updateProx() { + var sum, proxSize int + for _, r := range self.rows { + sum += r.len() + if sum <= self.MaxProxSize || r.len() == 0 { + proxSize = sum + } + } + self.lock.Lock() + self.proxSize = proxSize + self.lock.Unlock() +} + func (self *Cademlia) GetPeers(target []byte) (peers []peerInfo) { - index := self.commonPrefixLength(target) + self.lock.RLock() + defer self.lock.RUnlock() + index := self.DistanceTo(target) var entries []*entry - if index >= self.depth { - for i := self.depth; i < self.hashBits; i++ { + if index >= self.proxLimit { + for i := self.proxLimit; i < self.HashBits; i++ { entries = append(entries, self.rows[i].row...) } } else { entries = self.rows[index].row } + for _, entry := range entries { + peers = append(peers, entry.peer) + } return } @@ -102,16 +155,23 @@ type row struct { lock sync.RWMutex } -func (self *row) insert(entry *entry) (ok bool) { +func (self *row) len() int { + self.lock.RLock() + defer self.lock.RUnlock() + return self.length +} + +func (self *row) insert(entry *entry) (added bool) { self.lock.Lock() defer self.lock.Unlock() - if len(self.row) >= self.length { + if len(self.row) >= self.length { // >= allows us to add peers beyond the Rowlength limitation worst := self.worst() - // err = diconnectF(self.row[worst]) + self.row[worst].peer.Disconnect(DiscSubprotocolError) self.row[worst] = entry } else { self.row = append(self.row, entry) - ok = true + added = true + self.length++ } return } @@ -128,33 +188,37 @@ func (self *row) worst() (index int) { } func (self *row) purge(recently time.Time) { + self.lock.Lock() var newRow []*entry for _, entry := range self.row { if !entry.peer.LastActive().Before(recently) { newRow = append(newRow, entry) } else { - // self.DisconnectF(entry.peer) + entry.peer.Disconnect(DiscSubprotocolError) } } + self.row = newRow + self.length = len(newRow) + self.lock.Unlock() } func Hash(key []byte) []byte { return key } -func (self *Cademlia) updateDepth() { -} - func (self *Cademlia) purgeLoop() { - ticker := time.Tick(self.purgeInterval) + ticker := time.Tick(self.PurgeInterval) for { select { case <-self.quitC: return case <-ticker: + self.lock.Lock() for _, r := range self.rows { - r.purge(time.Now().Add(-self.maxAge)) + r.purge(time.Now().Add(-self.MaxAge)) } + self.updateProx() + self.lock.Unlock() } } } @@ -166,14 +230,18 @@ func Xor(one, other []byte) (xor []byte) { return } -func (self *Cademlia) commonPrefixLength(other []byte) (ret int) { - xor := Xor(self.hash, other) - for i := 0; i < self.hashBits; i++ { +func (self *Cademlia) DistanceTo(other []byte) (ret int) { + return self.Distance(self.Hash, other) +} + +func (self *Cademlia) Distance(one, other []byte) (ret int) { + xor := Xor(one, other) + for i := 0; i < self.HashBits; i++ { for j := 0; j < 8; j++ { if (xor[i]>>uint8(7-j))&0x1 != 0 { return i*8 + j } } } - return self.hashBits*8 - 1 + return self.HashBits*8 - 1 } From c55c67dbb4eedf033bf64af7060e2ba185d2ea57 Mon Sep 17 00:00:00 2001 From: zelig Date: Fri, 9 Jan 2015 15:01:19 +0000 Subject: [PATCH 16/17] cademlia proximity - distance -> proximity - proximity bin properly implemented - fix update and adjust prox functions - rename fields and methods - comments --- p2p/cademlia.go | 104 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 72 insertions(+), 32 deletions(-) diff --git a/p2p/cademlia.go b/p2p/cademlia.go index 21f0c98a3d4e..e356a3f133e7 100644 --- a/p2p/cademlia.go +++ b/p2p/cademlia.go @@ -2,6 +2,7 @@ package p2p import ( "fmt" + "math" "sync" "time" @@ -11,8 +12,9 @@ import ( var cadlogger = ethlogger.NewLogger("CAD") const ( - hashBits = 160 + hashBytes = 20 rowLength = 10 + maxProx = 20 ) var maxAge = 180 * time.Nanosecond @@ -20,10 +22,11 @@ var purgeInterval = 300 * time.Second type Cademlia struct { Hash []byte - HashBits int + HashBytes int RowLength int - MaxProxSize int + MaxProx int + MaxProxBinSize int MaxAge time.Duration PurgeInterval time.Duration @@ -37,12 +40,16 @@ type Cademlia struct { quitC chan bool } +// public constructor with compulsory arguments +// hash is a byte slice of length equal to self.HashBytes func NewCademlia(hash []byte) *Cademlia { return &Cademlia{ Hash: hash, // compulsory fields without default } } +// Start brings up a pool of peers potentially from an offline persisted source +// and sets default values for optional parameters func (self *Cademlia) Start() error { self.lock.Lock() defer self.lock.Unlock() @@ -51,15 +58,18 @@ func (self *Cademlia) Start() error { } // these + self.Hash can and should be checked against the // saved file/db - if self.HashBits == 0 { - self.HashBits = hashBits + if self.HashBytes == 0 { + self.HashBytes = hashBytes + } + if self.MaxProx == 0 { + self.MaxProx = maxProx } if self.RowLength == 0 { self.RowLength = rowLength } // runtime parameters - if self.MaxProxSize == 0 { - self.MaxProxSize = self.RowLength + if self.MaxProxBinSize == 0 { + self.MaxProxBinSize = self.RowLength } if self.MaxAge == time.Duration(0) { self.MaxAge = maxAge @@ -67,7 +77,7 @@ func (self *Cademlia) Start() error { if self.PurgeInterval == time.Duration(0) { self.PurgeInterval = purgeInterval } - self.rows = make([]*row, self.HashBits) + self.rows = make([]*row, self.MaxProx) for i, _ := range self.rows { self.rows[i] = &row{} // will initialise row{int(0),[]*entry(nil),sync.Mutex} } @@ -76,6 +86,7 @@ func (self *Cademlia) Start() error { return nil } +// Stop saves the routing table into a persistant form func (self *Cademlia) Stop() { self.lock.Lock() defer self.lock.Unlock() @@ -86,10 +97,13 @@ func (self *Cademlia) Stop() { self.quitC = nil } +// AddPeer is the entry point where new peers are suggested for addition to the peer pool +// peers conform to the peerrInfo interface +// AddPeer(peer) returns an error if it deems the peer unworthy func (self *Cademlia) AddPeer(peer peerInfo) (err error) { self.lock.Lock() defer self.lock.Unlock() - index := self.DistanceTo(peer.Hash()) + index := self.ProximityBin(peer.Hash()) row := self.rows[index] added := row.insert(&entry{peer: peer}) if added { @@ -104,35 +118,41 @@ func (self *Cademlia) AddPeer(peer peerInfo) (err error) { return } +// adjust Prox (proxLimit and proxSize after an insertion of add entries into row r) func (self *Cademlia) adjustProx(r int, add int) { - if self.proxSize+add > self.MaxProxSize && + self.lock.Lock() + defer self.lock.Unlock() + if r >= self.proxLimit && + self.proxSize+add > self.MaxProxBinSize && self.rows[r].len() > 0 { - self.proxLimit-- + self.proxLimit++ } else { self.proxSize += add } } +// updates Prox (proxLimit and proxSize after purging entries) func (self *Cademlia) updateProx() { - var sum, proxSize int - for _, r := range self.rows { - sum += r.len() - if sum <= self.MaxProxSize || r.len() == 0 { - proxSize = sum + self.lock.Lock() + defer self.lock.Unlock() + var sum int + for i := self.MaxProx - 1; i >= 0; i-- { + l := self.rows[i].len() + sum += l + if sum <= self.MaxProxBinSize || l == 0 { + self.proxSize = sum } } - self.lock.Lock() - self.proxSize = proxSize - self.lock.Unlock() } +// GetPeers(target) returns the list of peers belonging to the same proximity bin as the target. The most proximate bin will be the union of the bins between proxLimit and MaxProx. proxLimit is dynamically adjusted so that 1) there is no empty rows in bin < proxLimit and 2) the sum of all items are the maximum possible but lower than MaxProxBinSize func (self *Cademlia) GetPeers(target []byte) (peers []peerInfo) { self.lock.RLock() defer self.lock.RUnlock() - index := self.DistanceTo(target) + index := self.ProximityBin(target) var entries []*entry if index >= self.proxLimit { - for i := self.proxLimit; i < self.HashBits; i++ { + for i := self.proxLimit; i < self.MaxProx; i++ { entries = append(entries, self.rows[i].row...) } } else { @@ -144,11 +164,13 @@ func (self *Cademlia) GetPeers(target []byte) (peers []peerInfo) { return } +// entry wrapper type for peer object adding potentially persisted metadata for offline permanent record type entry struct { peer peerInfo // metadata } +// in situ mutable row type row struct { length int row []*entry @@ -161,6 +183,7 @@ func (self *row) len() int { return self.length } +// insert adds a peer to a row either by appending to existing items if row length does not exceed RowLength, or by replacing the worst entry in the row func (self *row) insert(entry *entry) (added bool) { self.lock.Lock() defer self.lock.Unlock() @@ -176,6 +199,7 @@ func (self *row) insert(entry *entry) (added bool) { return } +// worst expunges the single worst entry in a row, where worst entry is with a peer that has not been active the longests func (self *row) worst() (index int) { var oldest time.Time for i, entry := range self.row { @@ -187,6 +211,8 @@ func (self *row) worst() (index int) { return } +// expunges entries from a row that were last active more that MaxAge ago +// calls Disconnect on entry.peer func (self *row) purge(recently time.Time) { self.lock.Lock() var newRow []*entry @@ -223,25 +249,39 @@ func (self *Cademlia) purgeLoop() { } } -func Xor(one, other []byte) (xor []byte) { - for i := 0; i < len(one); i++ { - xor[i] = one[i] ^ other[i] - } - return -} +/* +Taking the proximity value relative to a fix point x classifies the points in the space (n byte long byte sequences) into bins the items in which are each at most half as distant from x as items in the previous bin. Given a sample of uniformly distrbuted items (a hash function over arbitrary sequence) the proximity scale maps onto series of subsets with cardinalities on a negative exponential scale. + +It also has the property that any two item belonging to the same bin are at most half as distant from each other as they are from x. + +If we think of random sample of items in the bins as connections in a network of interconnected nodes than relative proximity can serve as the basis for local decisions for graph traversal where the task is to find a route between two points. Since in every step of forwarding, the finite distance halves, there is a guaranteed constant maximum limit on the number of hops needed to reach one node from the other. +*/ -func (self *Cademlia) DistanceTo(other []byte) (ret int) { - return self.Distance(self.Hash, other) +func (self *Cademlia) ProximityBin(other []byte) (ret int) { + return int(math.Min(float64(self.MaxProx), float64(self.Proximity(self.Hash, other)))) } -func (self *Cademlia) Distance(one, other []byte) (ret int) { +/* +The distance metric MSB(x, y) of two equal length bytesequences x an y is the value of the +binary integer cast of the xor-ed bytesequence most significant bit first. +Proximity(x, y) counts the common zeros in the front of this distance measure. +*/ + +func (self *Cademlia) Proximity(one, other []byte) (ret int) { xor := Xor(one, other) - for i := 0; i < self.HashBits; i++ { + for i := 0; i < self.HashBytes; i++ { for j := 0; j < 8; j++ { if (xor[i]>>uint8(7-j))&0x1 != 0 { return i*8 + j } } } - return self.HashBits*8 - 1 + return self.HashBytes*8 - 1 +} + +func Xor(one, other []byte) (xor []byte) { + for i := 0; i < len(one); i++ { + xor[i] = one[i] ^ other[i] + } + return } From 88533a37ab6f8828bc8bf0437aa003d484f3c423 Mon Sep 17 00:00:00 2001 From: zelig Date: Fri, 9 Jan 2015 17:52:30 +0000 Subject: [PATCH 17/17] remove TODO --- p2p/TODO | 8 -------- 1 file changed, 8 deletions(-) delete mode 100644 p2p/TODO diff --git a/p2p/TODO b/p2p/TODO deleted file mode 100644 index 20ede98299ea..000000000000 --- a/p2p/TODO +++ /dev/null @@ -1,8 +0,0 @@ - - -- make sure lastActive is saved at disconnect -- put peer selection loop for inbound connections into verifyPeer -- use peerError in peer selector - - -