diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj
index beeca5b342a..8efeea2fb6a 100644
--- a/Builds/VisualStudio2013/RippleD.vcxproj
+++ b/Builds/VisualStudio2013/RippleD.vcxproj
@@ -2720,31 +2720,6 @@
-
-
-
-
-
-
-
-
-
-
-
- True
-
-
- True
-
-
-
-
- True
-
-
-
-
-
@@ -3108,6 +3083,49 @@
True
+
+
+
+
+
+
+
+
+
+ True
+
+
+ True
+
+
+
+
+ True
+
+
+
+
+ True
+
+
+
+
+
+
+
+
+ True
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters
index b5a74e86cc5..f74b03278eb 100644
--- a/Builds/VisualStudio2013/RippleD.vcxproj.filters
+++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters
@@ -433,15 +433,6 @@
{643F2F19-8319-49F7-7DF1-4BF4F52BF4BF}
-
- {AE687926-F53E-728F-2D48-A1DEC283E86E}
-
-
- {909364A1-443C-393A-3505-D36E048D7181}
-
-
- {C04BC25B-9F43-7B79-91FE-6C62B0E206EE}
-
{4DB18141-EDCD-D13F-A8D3-407F5EF72217}
@@ -469,6 +460,12 @@
{3FDC73B9-A9C7-3533-FD40-E4032B9FA9FB}
+
+ {51E817F6-DC2E-045A-9009-37E3D16D0990}
+
+
+ {07E4BC73-2B68-D0D1-D922-FEBBB573F503}
+
{186385AD-A056-FA3A-7E0E-759EB55E9EAB}
@@ -3858,39 +3855,6 @@
ripple\module\net\rpc
-
- ripple\module\overlay\api
-
-
- ripple\module\overlay\api
-
-
- ripple\module\overlay\api
-
-
- ripple\module\overlay\api
-
-
- ripple\module\overlay\api
-
-
- ripple\module\overlay\impl
-
-
- ripple\module\overlay\impl
-
-
- ripple\module\overlay\impl
-
-
- ripple\module\overlay\impl
-
-
- ripple\module\overlay\impl
-
-
- ripple\module\overlay\impl
-
ripple\module\rpc
@@ -4293,6 +4257,63 @@
ripple\nodestore\tests
+
+ ripple\overlay
+
+
+ ripple\overlay
+
+
+ ripple\overlay
+
+
+ ripple\overlay
+
+
+ ripple\overlay\impl
+
+
+ ripple\overlay\impl
+
+
+ ripple\overlay\impl
+
+
+ ripple\overlay\impl
+
+
+ ripple\overlay\impl
+
+
+ ripple\overlay\impl
+
+
+ ripple\overlay\impl
+
+
+ ripple\overlay\impl
+
+
+ ripple\overlay\impl
+
+
+ ripple\overlay\impl
+
+
+ ripple\overlay\impl
+
+
+ ripple\overlay\impl
+
+
+ ripple\overlay\impl
+
+
+ ripple\overlay
+
+
+ ripple\overlay
+
ripple\peerfinder\api
diff --git a/src/beast/beast/asio/abstract_socket.h b/src/beast/beast/asio/abstract_socket.h
index ec3c5cbcddf..ef2b1dba9cc 100644
--- a/src/beast/beast/asio/abstract_socket.h
+++ b/src/beast/beast/asio/abstract_socket.h
@@ -22,7 +22,7 @@
#include
#include
-
+#include
#include
#include
diff --git a/src/beast/beast/module/asio/protocol/InputParser.h b/src/beast/beast/module/asio/protocol/InputParser.h
index a1323468f7b..2e6117a77a6 100644
--- a/src/beast/beast/module/asio/protocol/InputParser.h
+++ b/src/beast/beast/module/asio/protocol/InputParser.h
@@ -21,6 +21,9 @@
#define BEAST_ASIO_HANDSHAKE_INPUTPARSER_H_INCLUDED
#include
+#include
+
+#include
namespace beast {
namespace asio {
diff --git a/src/ripple/common/MultiSocket.h b/src/ripple/common/MultiSocket.h
index b013b794b95..d73c39a5e40 100644
--- a/src/ripple/common/MultiSocket.h
+++ b/src/ripple/common/MultiSocket.h
@@ -23,8 +23,8 @@
#include
#include
#include
-
#include
+#include
namespace ripple {
diff --git a/src/ripple/module/app/consensus/LedgerConsensus.cpp b/src/ripple/module/app/consensus/LedgerConsensus.cpp
index 6dd86340553..c57445d3e00 100644
--- a/src/ripple/module/app/consensus/LedgerConsensus.cpp
+++ b/src/ripple/module/app/consensus/LedgerConsensus.cpp
@@ -17,7 +17,7 @@
*/
//==============================================================================
-#include
+#include
namespace ripple {
diff --git a/src/ripple/module/app/ledger/InboundLedger.cpp b/src/ripple/module/app/ledger/InboundLedger.cpp
index ad3be871f90..a0ec166f36a 100644
--- a/src/ripple/module/app/ledger/InboundLedger.cpp
+++ b/src/ripple/module/app/ledger/InboundLedger.cpp
@@ -17,7 +17,7 @@
*/
//==============================================================================
-#include
+#include
#include
namespace ripple {
@@ -502,7 +502,7 @@ void InboundLedger::trigger (Peer::ptr const& peer)
if (iPeer)
{
mByHash = false;
- iPeer->sendPacket (packet, false);
+ iPeer->send (packet);
}
}
}
diff --git a/src/ripple/module/app/ledger/LedgerMaster.cpp b/src/ripple/module/app/ledger/LedgerMaster.cpp
index e5e421e8922..b7d9a0c6632 100644
--- a/src/ripple/module/app/ledger/LedgerMaster.cpp
+++ b/src/ripple/module/app/ledger/LedgerMaster.cpp
@@ -550,7 +550,7 @@ class LedgerMasterImp
tmBH.set_ledgerhash (nextLedger->getHash().begin (), 32);
Message::pointer packet = std::make_shared (tmBH, protocol::mtGET_OBJECTS);
- target->sendPacket (packet, false);
+ target->send (packet);
WriteLog (lsTRACE, LedgerMaster) << "Requested fetch pack for " << nextLedger->getLedgerSeq() - 1;
}
else
diff --git a/src/ripple/module/app/main/Application.cpp b/src/ripple/module/app/main/Application.cpp
index 00aa006b0f7..88ea27afae8 100644
--- a/src/ripple/module/app/main/Application.cpp
+++ b/src/ripple/module/app/main/Application.cpp
@@ -25,13 +25,15 @@
#include
#include
#include
-#include
-#include
#include
#include
#include
#include
-
+#include
+#include
+#include
+#include
+
namespace ripple {
// VFALCO TODO Clean this global up
diff --git a/src/ripple/module/app/misc/NetworkOPs.cpp b/src/ripple/module/app/misc/NetworkOPs.cpp
index b65f3308ee4..98ba66270b0 100644
--- a/src/ripple/module/app/misc/NetworkOPs.cpp
+++ b/src/ripple/module/app/misc/NetworkOPs.cpp
@@ -18,8 +18,8 @@
//==============================================================================
#include
+#include
#include
-#include
#include
#include
#include
@@ -3288,7 +3288,7 @@ void NetworkOPsImp::makeFetchPack (Job&, std::weak_ptr wPeer,
m_journal.info << "Built fetch pack with " << reply.objects ().size () << " nodes";
Message::pointer msg = std::make_shared (reply, protocol::mtGET_OBJECTS);
- peer->sendPacket (msg, false);
+ peer->send (msg);
}
catch (...)
{
diff --git a/src/ripple/module/app/peers/PeerSet.cpp b/src/ripple/module/app/peers/PeerSet.cpp
index b277763df32..430687430da 100644
--- a/src/ripple/module/app/peers/PeerSet.cpp
+++ b/src/ripple/module/app/peers/PeerSet.cpp
@@ -17,7 +17,7 @@
*/
//==============================================================================
-#include
+#include
#include
namespace ripple {
@@ -151,7 +151,7 @@ void PeerSet::sendRequest (const protocol::TMGetLedger& tmGL, Peer::ptr const& p
if (!peer)
sendRequest (tmGL);
else
- peer->sendPacket (std::make_shared (tmGL, protocol::mtGET_LEDGER), false);
+ peer->send (std::make_shared (tmGL, protocol::mtGET_LEDGER));
}
void PeerSet::sendRequest (const protocol::TMGetLedger& tmGL)
@@ -169,7 +169,7 @@ void PeerSet::sendRequest (const protocol::TMGetLedger& tmGL)
Peer::ptr peer (getApp().overlay ().findPeerByShortID (p.first));
if (peer)
- peer->sendPacket (packet, false);
+ peer->send (packet);
}
}
diff --git a/src/ripple/module/app/peers/PeerSet.h b/src/ripple/module/app/peers/PeerSet.h
index e995a87ac48..a92313cdd07 100644
--- a/src/ripple/module/app/peers/PeerSet.h
+++ b/src/ripple/module/app/peers/PeerSet.h
@@ -20,7 +20,7 @@
#ifndef RIPPLE_PEERSET_H
#define RIPPLE_PEERSET_H
-#include
+#include
namespace ripple {
diff --git a/src/ripple/module/app/tx/TransactionAcquire.cpp b/src/ripple/module/app/tx/TransactionAcquire.cpp
index 40d8a93da4e..a9ce36800dd 100644
--- a/src/ripple/module/app/tx/TransactionAcquire.cpp
+++ b/src/ripple/module/app/tx/TransactionAcquire.cpp
@@ -17,7 +17,7 @@
*/
//==============================================================================
-#include
+#include
namespace ripple {
diff --git a/src/ripple/module/overlay/README.md b/src/ripple/module/overlay/README.md
deleted file mode 100644
index edb3f862d2b..00000000000
--- a/src/ripple/module/overlay/README.md
+++ /dev/null
@@ -1,16 +0,0 @@
-# Overlay
-
-## Introduction
-
-The _Ripple payment network_ consists of a collection of _peers_ running the
-**rippled software**. Each peer maintains multiple outgoing connections and
-optional incoming connections to other peers. These connections are made over
-both the public Internet and private local area networks. This network defines
-a fully connected directed graph of nodes. Peers send and receive messages to
-other connected peers. This peer to peer network, layered on top of the public
-and private Internet, forms an [_overlay network_][overlay_network]. The
-contents of the messages and the behavior of peers in response to the messages,
-plus the information exchanged during the handshaking phase of connection
-establishment, defines the _Peer Protocol_.
-
-[overlay_network]: http://en.wikipedia.org/wiki/Overlay_network
\ No newline at end of file
diff --git a/src/ripple/module/overlay/impl/PeerImp.h b/src/ripple/module/overlay/impl/PeerImp.h
deleted file mode 100644
index b91c8269712..00000000000
--- a/src/ripple/module/overlay/impl/PeerImp.h
+++ /dev/null
@@ -1,2861 +0,0 @@
-//------------------------------------------------------------------------------
-/*
- This file is part of rippled: https://github.com/ripple/rippled
- Copyright (c) 2012, 2013 Ripple Labs Inc.
-
- Permission to use, copy, modify, and/or distribute this software for any
- purpose with or without fee is hereby granted, provided that the above
- copyright notice and this permission notice appear in all copies.
-
- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-*/
-//==============================================================================
-
-#ifndef RIPPLE_OVERLAY_PEERIMP_H_INCLUDED
-#define RIPPLE_OVERLAY_PEERIMP_H_INCLUDED
-
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-
-// VFALCO This is unfortunate. Comment this out and
-// just include what is needed.
-#include
-
-#include
-#include
-
-#include
-
-namespace ripple {
-
-typedef boost::asio::ip::tcp::socket NativeSocketType;
-
-class PeerImp;
-
-std::string to_string (Peer const& peer);
-std::ostream& operator<< (std::ostream& os, Peer const& peer);
-
-std::string to_string (Peer const* peer);
-std::ostream& operator<< (std::ostream& os, Peer const* peer);
-
-std::string to_string (PeerImp const& peer);
-std::ostream& operator<< (std::ostream& os, PeerImp const& peer);
-
-std::string to_string (PeerImp const* peer);
-std::ostream& operator<< (std::ostream& os, PeerImp const* peer);
-
-//------------------------------------------------------------------------------
-
-class PeerImp
- : public Peer
- , public std::enable_shared_from_this
- , private beast::LeakChecked
-{
-private:
- /** Time alloted for a peer to send a HELLO message (DEPRECATED) */
- static const boost::posix_time::seconds nodeVerifySeconds;
-
- /** The clock drift we allow a remote peer to have */
- static const std::uint32_t clockToleranceDeltaSeconds = 20;
-
- /** The length of the smallest valid finished message */
- static const size_t sslMinimumFinishedLength = 12;
-
- //--------------------------------------------------------------------------
- /** We have accepted an inbound connection.
-
- The connection state transitions from `stateConnect` to `stateConnected`
- as `stateConnect`.
- */
- void accept ()
- {
- m_journal.info << "Accepted " << m_remoteAddress;
-
- m_socket->set_verify_mode (boost::asio::ssl::verify_none);
- m_socket->async_handshake (
- boost::asio::ssl::stream_base::server,
- m_strand.wrap (std::bind (
- &PeerImp::handleStart,
- std::static_pointer_cast (shared_from_this ()),
- beast::asio::placeholders::error)));
- }
-
- /** Attempt an outbound connection.
-
- The connection may fail (for a number of reasons) and we do not know
- what will happen at this point.
-
- The connection state does not transition with this function and remains
- as `stateConnecting`.
- */
- void connect ()
- {
- m_journal.info << "Connecting to " << m_remoteAddress;
-
- boost::system::error_code err;
-
- m_timer.expires_from_now (nodeVerifySeconds, err);
-
- m_timer.async_wait (m_strand.wrap (std::bind (
- &PeerImp::handleVerifyTimer,
- shared_from_this (), beast::asio::placeholders::error)));
-
- if (err)
- {
- m_journal.error << "Failed to set verify timer.";
- detach ("c2");
- return;
- }
-
- m_socket->next_layer ().async_connect (
- beast::IPAddressConversion::to_asio_endpoint (m_remoteAddress),
- m_strand.wrap (std::bind (&PeerImp::onConnect,
- shared_from_this (), beast::asio::placeholders::error)));
- }
-
-public:
- /** Current state */
- enum State
- {
- /** An connection is being established (outbound) */
- stateConnecting
-
- /** Connection has been successfully established */
- ,stateConnected
-
- /** Handshake has been received from this peer */
- ,stateHandshaked
-
- /** Running the Ripple protocol actively */
- ,stateActive
-
- /** Gracefully closing */
- ,stateGracefulClose
- };
-
- typedef std::shared_ptr ptr;
-
- NativeSocketType m_owned_socket;
-
- beast::Journal m_journal;
-
- // A unique identifier (up to a restart of rippled) for this particular
- // peer instance. A peer that disconnects will, upon reconnection, get a
- // new ID.
- ShortId m_shortId;
-
- // Updated at each stage of the connection process to reflect
- // the current conditions as closely as possible. This includes
- // the case where we learn the true IP via a PROXY handshake.
- beast::IP::Endpoint m_remoteAddress;
-
- // These is up here to prevent warnings about order of initializations
- //
- Resource::Manager& m_resourceManager;
- PeerFinder::Manager& m_peerFinder;
- OverlayImpl& m_overlay;
- bool m_inbound;
-
- std::unique_ptr m_socket;
- boost::asio::io_service::strand m_strand;
-
- State m_state; // Current state
- bool m_detaching; // True if detaching.
- bool m_clusterNode; // True if peer is a node in our cluster
- RippleAddress m_nodePublicKey; // Node public key of peer.
- std::string m_nodeName;
-
- // Both sides of the peer calculate this value and verify that it matches
- // to detect/prevent man-in-the-middle attacks.
- //
- uint256 m_secureCookie;
-
- // The indices of the smallest and largest ledgers this peer has available
- //
- LedgerIndex m_minLedger;
- LedgerIndex m_maxLedger;
-
- uint256 m_closedLedgerHash;
- uint256 m_previousLedgerHash;
-
- std::list m_recentLedgers;
- std::list m_recentTxSets;
- mutable std::mutex m_recentLock;
-
- boost::asio::deadline_timer m_timer;
-
- std::vector m_readBuffer;
- std::list mSendQ;
- Message::pointer mSendingPacket;
- protocol::TMStatusChange mLastStatus;
- protocol::TMHello mHello;
-
- Resource::Consumer m_usage;
-
- // The slot assigned to us by PeerFinder
- PeerFinder::Slot::ptr m_slot;
-
- // True if close was called
- bool m_was_canceled;
-
- //--------------------------------------------------------------------------
- /** New incoming peer from the specified socket */
- PeerImp (
- NativeSocketType&& socket,
- beast::IP::Endpoint remoteAddress,
- OverlayImpl& overlay,
- Resource::Manager& resourceManager,
- PeerFinder::Manager& peerFinder,
- PeerFinder::Slot::ptr const& slot,
- boost::asio::ssl::context& ssl_context,
- MultiSocket::Flag flags)
- : m_owned_socket (std::move (socket))
- , m_journal (LogPartition::getJournal ())
- , m_shortId (0)
- , m_remoteAddress (remoteAddress)
- , m_resourceManager (resourceManager)
- , m_peerFinder (peerFinder)
- , m_overlay (overlay)
- , m_inbound (true)
- , m_socket (MultiSocket::New (
- m_owned_socket, ssl_context, flags.asBits ()))
- , m_strand (m_owned_socket.get_io_service())
- , m_state (stateConnected)
- , m_detaching (false)
- , m_clusterNode (false)
- , m_minLedger (0)
- , m_maxLedger (0)
- , m_timer (m_owned_socket.get_io_service())
- , m_slot (slot)
- , m_was_canceled (false)
- {
- }
-
- /** New outgoing peer
- @note Construction of outbound peers is a two step process: a second
- call is needed (to connect or accept) but we cannot make it from
- inside the constructor because you cannot call shared_from_this
- from inside constructors.
- */
- PeerImp (
- beast::IP::Endpoint remoteAddress,
- boost::asio::io_service& io_service,
- OverlayImpl& overlay,
- Resource::Manager& resourceManager,
- PeerFinder::Manager& peerFinder,
- PeerFinder::Slot::ptr const& slot,
- boost::asio::ssl::context& ssl_context,
- MultiSocket::Flag flags)
- : m_owned_socket (io_service)
- , m_journal (LogPartition::getJournal ())
- , m_shortId (0)
- , m_remoteAddress (remoteAddress)
- , m_resourceManager (resourceManager)
- , m_peerFinder (peerFinder)
- , m_overlay (overlay)
- , m_inbound (false)
- , m_socket (MultiSocket::New (
- io_service, ssl_context, flags.asBits ()))
- , m_strand (io_service)
- , m_state (stateConnecting)
- , m_detaching (false)
- , m_clusterNode (false)
- , m_minLedger (0)
- , m_maxLedger (0)
- , m_timer (io_service)
- , m_slot (slot)
- , m_was_canceled (false)
- {
- }
-
- virtual
- ~PeerImp ()
- {
- m_overlay.remove (m_slot);
- }
-
- PeerImp (PeerImp const&) = delete;
- PeerImp& operator= (PeerImp const&) = delete;
-
- MultiSocket& getStream ()
- {
- return *m_socket;
- }
-
- static char const* getCountedObjectName () { return "Peer"; }
-
- //--------------------------------------------------------------------------
-
- State state() const
- {
- return m_state;
- }
-
- void state (State new_state)
- {
- m_state = new_state;
- }
-
- //--------------------------------------------------------------------------
- /** Disconnect a peer
-
- The peer transitions from its current state into `stateGracefulClose`
-
- @param rsn a code indicating why the peer was disconnected
- @param onIOStrand true if called on an I/O strand. It if is not, then
- a callback will be queued up.
- */
- void detach (const char* rsn, bool graceful = true)
- {
- if (! m_strand.running_in_this_thread ())
- {
- m_strand.post (std::bind (&PeerImp::detach,
- shared_from_this (), rsn, graceful));
- return;
- }
-
- if (!m_detaching)
- {
- // NIKB TODO No - a race is NOT ok. This needs to be fixed
- // to have PeerFinder work reliably.
- m_detaching = true; // Race is ok.
-
- if (m_was_canceled)
- m_peerFinder.on_cancel (m_slot);
- else
- m_peerFinder.on_closed (m_slot);
-
- if (m_state == stateActive)
- m_overlay.onPeerDisconnect (shared_from_this ());
-
- m_state = stateGracefulClose;
-
- if (m_clusterNode && m_journal.active(beast::Journal::Severity::kWarning))
- m_journal.warning << "Cluster peer " << m_nodeName <<
- " detached: " << rsn;
-
- mSendQ.clear ();
-
- (void) m_timer.cancel ();
-
- if (graceful)
- {
- m_socket->async_shutdown (
- m_strand.wrap ( std::bind(
- &PeerImp::handleShutdown,
- std::static_pointer_cast (shared_from_this ()),
- beast::asio::placeholders::error)));
- }
- else
- {
- m_socket->cancel ();
- }
-
- // VFALCO TODO Stop doing this.
- if (m_nodePublicKey.isValid ())
- m_nodePublicKey.clear (); // Be idempotent.
- }
- }
-
- /** Close the connection. */
- void close (bool graceful)
- {
- m_was_canceled = true;
- detach ("stop", graceful);
- }
-
- /** Outbound connection attempt has completed (not necessarily successfully)
-
- The connection may fail for a number of reasons. Perhaps we do not have
- a route to the remote endpoint, or there is no server listening at that
- address.
-
- If the connection succeeded, we transition to the `stateConnected` state
- and move on.
-
- If the connection failed, we simply disconnect.
-
- @param ec indicates success or an error code.
- */
- void onConnect (boost::system::error_code ec)
- {
- if (m_detaching)
- return;
-
- NativeSocketType::endpoint_type local_endpoint;
-
- if (! ec)
- local_endpoint = m_socket->this_layer <
- NativeSocketType> ().local_endpoint (ec);
-
- if (ec)
- {
- // VFALCO NOTE This log statement looks like ass
- m_journal.info <<
- "Connect to " << m_remoteAddress <<
- " failed: " << ec.message();
- // This should end up calling onPeerClosed()
- detach ("hc");
- return;
- }
-
- bassert (m_state == stateConnecting);
- m_state = stateConnected;
-
- m_peerFinder.on_connected (m_slot,
- beast::IPAddressConversion::from_asio (local_endpoint));
-
- m_socket->set_verify_mode (boost::asio::ssl::verify_none);
- m_socket->async_handshake (
- boost::asio::ssl::stream_base::client,
- m_strand.wrap (std::bind (&PeerImp::handleStart,
- std::static_pointer_cast (shared_from_this ()),
- beast::asio::placeholders::error)));
- }
-
- /** Indicates that the peer must be activated.
- A peer is activated after the handshake is completed and if it is not
- a second connection from a peer that we already have. Once activated
- the peer transitions to `stateActive` and begins operating.
- */
- void activate ()
- {
- bassert (m_state == stateHandshaked);
- m_state = stateActive;
- bassert(m_shortId == 0);
- m_shortId = m_overlay.next_id();
- m_overlay.onPeerActivated(shared_from_this ());
- }
-
- void start ()
- {
- if (m_inbound)
- accept ();
- else
- connect ();
- }
-
- //--------------------------------------------------------------------------
- std::string getClusterNodeName() const
- {
- return m_nodeName;
- }
-
- //--------------------------------------------------------------------------
-
- void sendPacket (const Message::pointer& packet, bool onStrand)
- {
- if (packet)
- {
- if (!onStrand)
- {
- m_strand.post (std::bind (
- &Peer::sendPacket, shared_from_this (), packet, true));
- return;
- }
-
- if (mSendingPacket)
- {
- mSendQ.push_back (packet);
- }
- else
- {
- sendPacketForce (packet);
- }
- }
- }
-
- void sendGetPeers ()
- {
- // Ask peer for known other peers.
- protocol::TMGetPeers msg;
-
- msg.set_doweneedthis (1);
-
- Message::pointer packet = std::make_shared (
- msg, protocol::mtGET_PEERS);
-
- sendPacket (packet, true);
- }
-
- void charge (Resource::Charge const& fee)
- {
- if ((m_usage.charge (fee) == Resource::drop) && m_usage.disconnect ())
- detach ("resource");
- }
-
- static void charge (std::weak_ptr & peer, Resource::Charge const& fee)
- {
- Peer::ptr p (peer.lock());
-
- if (p != nullptr)
- p->charge (fee);
- }
-
- Json::Value json ()
- {
- Json::Value ret (Json::objectValue);
-
- ret["public_key"] = m_nodePublicKey.ToString ();
- ret["address"] = m_remoteAddress.to_string();
-
- if (m_inbound)
- ret["inbound"] = true;
-
- if (m_clusterNode)
- {
- ret["cluster"] = true;
-
- if (!m_nodeName.empty ())
- ret["name"] = m_nodeName;
- }
-
- if (mHello.has_fullversion ())
- ret["version"] = mHello.fullversion ();
-
- if (mHello.has_protoversion () &&
- (mHello.protoversion () != BuildInfo::getCurrentProtocol().toPacked ()))
- {
- ret["protocol"] = BuildInfo::Protocol (mHello.protoversion ()).toStdString ();
- }
-
- std::uint32_t minSeq, maxSeq;
- ledgerRange(minSeq, maxSeq);
-
- if ((minSeq != 0) || (maxSeq != 0))
- ret["complete_ledgers"] = boost::lexical_cast(minSeq) + " - " +
- boost::lexical_cast(maxSeq);
-
- if (!!m_closedLedgerHash)
- ret["ledger"] = to_string (m_closedLedgerHash);
-
- if (mLastStatus.has_newstatus ())
- {
- switch (mLastStatus.newstatus ())
- {
- case protocol::nsCONNECTING:
- ret["status"] = "connecting";
- break;
-
- case protocol::nsCONNECTED:
- ret["status"] = "connected";
- break;
-
- case protocol::nsMONITORING:
- ret["status"] = "monitoring";
- break;
-
- case protocol::nsVALIDATING:
- ret["status"] = "validating";
- break;
-
- case protocol::nsSHUTTING:
- ret["status"] = "shutting";
- break;
-
- default:
- // FIXME: do we really want this?
- m_journal.warning << "Unknown status: " << mLastStatus.newstatus ();
- }
- }
-
- return ret;
- }
-
- bool isInCluster () const
- {
- return m_clusterNode;
- }
-
- uint256 const& getClosedLedgerHash () const
- {
- return m_closedLedgerHash;
- }
-
- bool hasLedger (uint256 const& hash, std::uint32_t seq) const
- {
- std::lock_guard sl(m_recentLock);
-
- if ((seq != 0) && (seq >= m_minLedger) && (seq <= m_maxLedger))
- return true;
-
- BOOST_FOREACH (uint256 const & ledger, m_recentLedgers)
- {
- if (ledger == hash)
- return true;
- }
-
- return false;
- }
-
- void ledgerRange (std::uint32_t& minSeq, std::uint32_t& maxSeq) const
- {
- std::lock_guard sl(m_recentLock);
-
- minSeq = m_minLedger;
- maxSeq = m_maxLedger;
- }
-
- bool hasTxSet (uint256 const& hash) const
- {
- std::lock_guard sl(m_recentLock);
- BOOST_FOREACH (uint256 const & set, m_recentTxSets)
-
- if (set == hash)
- return true;
-
- return false;
- }
-
- Peer::ShortId getShortId () const
- {
- return m_shortId;
- }
-
- const RippleAddress& getNodePublic () const
- {
- return m_nodePublicKey;
- }
-
- void cycleStatus ()
- {
- m_previousLedgerHash = m_closedLedgerHash;
- m_closedLedgerHash.zero ();
- }
-
- bool supportsVersion (int version)
- {
- return mHello.has_protoversion () && (mHello.protoversion () >= version);
- }
-
- bool hasRange (std::uint32_t uMin, std::uint32_t uMax)
- {
- return (uMin >= m_minLedger) && (uMax <= m_maxLedger);
- }
-
- beast::IP::Endpoint getRemoteAddress() const
- {
- return m_remoteAddress;
- }
-
-private:
- void handleShutdown (boost::system::error_code const& ec)
- {
- if (m_detaching)
- return;
-
- if (ec == boost::asio::error::operation_aborted)
- return;
-
- if (ec)
- {
- m_journal.info << "Shutdown: " << ec.message ();
- detach ("hsd");
- return;
- }
- }
-
- void handleWrite (boost::system::error_code const& ec, size_t bytes)
- {
- if (m_detaching)
- return;
-
- // Call on IO strand
-
- mSendingPacket.reset ();
-
- if (ec == boost::asio::error::operation_aborted)
- return;
-
- if (m_detaching)
- return;
-
- if (ec)
- {
- m_journal.info << "Write: " << ec.message ();
- detach ("hw");
- return;
- }
-
- if (!mSendQ.empty ())
- {
- Message::pointer packet = mSendQ.front ();
-
- if (packet)
- {
- sendPacketForce (packet);
- mSendQ.pop_front ();
- }
- }
- }
-
- void handleReadHeader (boost::system::error_code const& ec,
- std::size_t bytes)
- {
- if (m_detaching)
- return;
-
- if (ec == boost::asio::error::operation_aborted)
- return;
-
- if (ec)
- {
- m_journal.info << "ReadHeader: " << ec.message ();
- detach ("hrh1");
- return;
- }
-
- unsigned msg_len = Message::getLength (m_readBuffer);
-
- // WRITEME: Compare to maximum message length, abort if too large
- if ((msg_len > (32 * 1024 * 1024)) || (msg_len == 0))
- {
- detach ("hrh2");
- return;
- }
-
- startReadBody (msg_len);
- }
-
- void handleReadBody (boost::system::error_code const& ec,
- std::size_t bytes)
- {
- if (m_detaching)
- return;
-
- if (ec == boost::asio::error::operation_aborted)
- return;
-
- if (ec)
- {
- m_journal.info << "ReadBody: " << ec.message ();
-
- {
- Application::ScopedLockType lock (getApp ().getMasterLock ());
- detach ("hrb");
- }
-
- return;
- }
-
- processReadBuffer ();
- startReadHeader ();
- }
-
- // We have an encrypted connection to the peer.
- // Have it say who it is so we know to avoid redundant connections.
- // Establish that it really who we are talking to by having it sign a
- // connection detail. Also need to establish no man in the middle attack
- // is in progress.
- void handleStart (boost::system::error_code const& ec)
- {
- if (m_detaching)
- return;
-
- if (ec == boost::asio::error::operation_aborted)
- return;
-
- if (ec)
- {
- m_journal.info << "Handshake: " << ec.message ();
- detach ("hs");
- return;
- }
-
- if (m_inbound)
- m_usage = m_resourceManager.newInboundEndpoint (m_remoteAddress);
- else
- m_usage = m_resourceManager.newOutboundEndpoint (m_remoteAddress);
-
- if (m_usage.disconnect ())
- {
- detach ("resource");
- return;
- }
-
- if(!sendHello ())
- {
- m_journal.error << "Unable to send HELLO to " << m_remoteAddress;
- detach ("hello");
- return;
- }
-
- startReadHeader ();
- }
-
- void handleVerifyTimer (boost::system::error_code const& ec)
- {
- if (m_detaching)
- return;
-
- if (ec == boost::asio::error::operation_aborted)
- {
- // Timer canceled because deadline no longer needed.
- }
- else if (ec)
- {
- m_journal.info << "Peer verify timer error";
- }
- else
- {
- // m_journal.info << "Verify: Peer failed to verify in time.";
-
- detach ("hvt");
- }
- }
-
- void processReadBuffer ()
- {
- // must not hold peer lock
- int type = Message::getType (m_readBuffer);
-
- LoadEvent::autoptr event (
- getApp().getJobQueue ().getLoadEventAP (jtPEER, "Peer::read"));
-
- {
- // An mtHELLO message must be the first message receiced by a peer
- // and it must be received *exactly* once during a connection; any
- // other scenario constitutes a protocol violation.
-
- if ((m_state == stateConnected) && (type != protocol::mtHELLO))
- {
- m_journal.warning << "Protocol: HELLO expected!";
- detach ("prb-hello-expected");
- return;
- }
-
- if ((m_state == stateActive) && (type == protocol::mtHELLO))
- {
- m_journal.warning << "Protocol: HELLO unexpected!";
- detach ("prb-hello-unexpected");
- return;
- }
-
- size_t msgLen (m_readBuffer.size () - Message::kHeaderBytes);
-
- switch (type)
- {
- case protocol::mtHELLO:
- {
- event->reName ("Peer::hello");
- protocol::TMHello msg;
-
- if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvHello (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
- case protocol::mtCLUSTER:
- {
- event->reName ("Peer::cluster");
- protocol::TMCluster msg;
-
- if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvCluster (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
- case protocol::mtERROR_MSG:
- {
- event->reName ("Peer::errormessage");
- protocol::TMErrorMsg msg;
-
- if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvErrorMessage (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
- case protocol::mtPING:
- {
- event->reName ("Peer::ping");
- protocol::TMPing msg;
-
- if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvPing (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
- case protocol::mtGET_CONTACTS:
- {
- event->reName ("Peer::getcontacts");
- protocol::TMGetContacts msg;
-
- if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvGetContacts (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
- case protocol::mtCONTACT:
- {
- event->reName ("Peer::contact");
- protocol::TMContact msg;
-
- if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvContact (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
- case protocol::mtGET_PEERS:
- {
- event->reName ("Peer::getpeers");
- protocol::TMGetPeers msg;
-
- if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvGetPeers (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
- case protocol::mtPEERS:
- {
- event->reName ("Peer::peers");
- protocol::TMPeers msg;
-
- if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvPeers (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
- case protocol::mtENDPOINTS:
- {
- event->reName ("Peer::endpoints");
- protocol::TMEndpoints msg;
-
- if(msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvEndpoints (msg);
- else
- m_journal.warning << "parse error: " << type;;
- }
- break;
-
- case protocol::mtSEARCH_TRANSACTION:
- {
- event->reName ("Peer::searchtransaction");
- protocol::TMSearchTransaction msg;
-
- if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvSearchTransaction (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
- case protocol::mtGET_ACCOUNT:
- {
- event->reName ("Peer::getaccount");
- protocol::TMGetAccount msg;
-
- if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvGetAccount (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
- case protocol::mtACCOUNT:
- {
- event->reName ("Peer::account");
- protocol::TMAccount msg;
-
- if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvAccount (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
- case protocol::mtTRANSACTION:
- {
- event->reName ("Peer::transaction");
- protocol::TMTransaction msg;
-
- if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvTransaction (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
- case protocol::mtSTATUS_CHANGE:
- {
- event->reName ("Peer::statuschange");
- protocol::TMStatusChange msg;
-
- if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvStatus (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
- case protocol::mtPROPOSE_LEDGER:
- {
- event->reName ("Peer::propose");
- std::shared_ptr msg (
- std::make_shared ());
-
- if (msg->ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvPropose (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
- case protocol::mtGET_LEDGER:
- {
- event->reName ("Peer::getledger");
- std::shared_ptr msg (
- std::make_shared ());
-
- if (msg->ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvGetLedger (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
- case protocol::mtLEDGER_DATA:
- {
- event->reName ("Peer::ledgerdata");
- std::shared_ptr msg (
- std::make_shared ());
-
- if (msg->ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvLedger (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
- case protocol::mtHAVE_SET:
- {
- event->reName ("Peer::haveset");
- protocol::TMHaveTransactionSet msg;
-
- if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvHaveTxSet (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
- case protocol::mtVALIDATION:
- {
- event->reName ("Peer::validation");
- std::shared_ptr msg (
- std::make_shared ());
-
- if (msg->ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvValidation (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-#if 0
-
- case protocol::mtGET_VALIDATION:
- {
- protocol::TM msg;
-
- if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen))
- recv (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
-#endif
-
- case protocol::mtGET_OBJECTS:
- {
- event->reName ("Peer::getobjects");
- std::shared_ptr msg =
- std::make_shared ();
-
- if (msg->ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvGetObjectByHash (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
- case protocol::mtPROOFOFWORK:
- {
- event->reName ("Peer::proofofwork");
- protocol::TMProofWork msg;
-
- if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
- msgLen))
- recvProofWork (msg);
- else
- m_journal.warning << "parse error: " << type;
- }
- break;
-
-
- default:
- event->reName ("Peer::unknown");
- m_journal.warning << "Unknown Msg: " << type;
- m_journal.warning << strHex (&m_readBuffer[0], m_readBuffer.size ());
- }
- }
- }
-
- void startReadHeader ()
- {
- if (!m_detaching)
- {
- m_readBuffer.clear ();
- m_readBuffer.resize (Message::kHeaderBytes);
-
- boost::asio::async_read (getStream (),
- boost::asio::buffer (m_readBuffer),
- m_strand.wrap (std::bind (&PeerImp::handleReadHeader,
- std::static_pointer_cast (shared_from_this ()),
- beast::asio::placeholders::error,
- beast::asio::placeholders::bytes_transferred)));
- }
- }
-
- void startReadBody (unsigned msg_len)
- {
- // The first Message::kHeaderBytes bytes of m_readbuf already
- // contains the header. Expand it to fit in the body as well, and
- // start async read into the body.
-
- if (!m_detaching)
- {
- m_readBuffer.resize (Message::kHeaderBytes + msg_len);
-
- boost::asio::async_read (getStream (),
- boost::asio::buffer (
- &m_readBuffer [Message::kHeaderBytes], msg_len),
- m_strand.wrap (std::bind (
- &PeerImp::handleReadBody,
- std::static_pointer_cast (shared_from_this ()),
- beast::asio::placeholders::error,
- beast::asio::placeholders::bytes_transferred)));
- }
- }
-
- void sendPacketForce (const Message::pointer& packet)
- {
- // must be on IO strand
- if (!m_detaching)
- {
- mSendingPacket = packet;
-
- boost::asio::async_write (getStream (),
- boost::asio::buffer (packet->getBuffer ()),
- m_strand.wrap (std::bind (
- &PeerImp::handleWrite,
- std::static_pointer_cast (shared_from_this ()),
- beast::asio::placeholders::error,
- beast::asio::placeholders::bytes_transferred)));
- }
- }
-
- /** Hashes the latest finished message from an SSL stream
-
- @param sslSession the session to get the message from.
- @param hash the buffer into which the hash of the retrieved
- message will be saved. The buffer MUST be at least
- 64 bytes long.
- @param getMessage a pointer to the function to call to retrieve the
- finished message. This be either:
- `SSL_get_finished` or
- `SSL_get_peer_finished`.
-
- @return `true` if successful, `false` otherwise.
-
- */
- bool hashLatestFinishedMessage (const SSL *sslSession, unsigned char *hash,
- size_t (*getFinishedMessage)(const SSL *, void *buf, size_t))
- {
- unsigned char buf[1024];
-
- // Get our finished message and hash it.
- std::memset(hash, 0, 64);
-
- size_t len = getFinishedMessage (sslSession, buf, sizeof (buf));
-
- if(len < sslMinimumFinishedLength)
- return false;
-
- SHA512 (buf, len, hash);
-
- return true;
- }
-
- /** Generates a secure cookie to protect against man-in-the-middle attacks
-
- This function should never fail under normal circumstances and regular
- server operation.
-
- A failure prevents the cookie value from being calculated which is an
- important component of connection security. If this function fails, a
- secure connection cannot be established and the link MUST be dropped.
-
- @return `true` if the cookie was generated, `false` otherwise.
-
- @note failure is an exceptional situation - it should never happen and
- will almost always indicate an active man-in-the-middle attack is
- taking place.
- */
- bool calculateSessionCookie ()
- {
- SSL* ssl = m_socket->ssl_handle ();
-
- if (!ssl)
- {
- m_journal.error << "Cookie generation: No underlying connection";
- return false;
- }
-
- unsigned char sha1[64];
- unsigned char sha2[64];
-
- if (!hashLatestFinishedMessage(ssl, sha1, SSL_get_finished))
- {
- m_journal.error << "Cookie generation: local setup not complete";
- return false;
- }
-
- if (!hashLatestFinishedMessage(ssl, sha2, SSL_get_peer_finished))
- {
- m_journal.error << "Cookie generation: peer setup not complete";
- return false;
- }
-
- // If both messages hash to the same value (i.e. match) something is
- // wrong. This would cause the resulting cookie to be 0.
- if (memcmp (sha1, sha2, sizeof (sha1)) == 0)
- {
- m_journal.error << "Cookie generation: identical finished messages";
- return false;
- }
-
- for (size_t i = 0; i < sizeof (sha1); ++i)
- sha1[i] ^= sha2[i];
-
- // Finally, derive the actual cookie for the values that we have
- // calculated.
- m_secureCookie = Serializer::getSHA512Half (sha1, sizeof(sha1));
-
- return true;
- }
-
- /** Perform a secure handshake with the peer at the other end.
-
- If this function returns false then we cannot guarantee that there
- is no active man-in-the-middle attack taking place and the link
- MUST be disconnected.
-
- @return true if successful, false otherwise.
- */
- bool sendHello ()
- {
- if (!calculateSessionCookie())
- return false;
-
- Blob vchSig;
- getApp().getLocalCredentials ().getNodePrivate ().signNodePrivate (m_secureCookie, vchSig);
-
- protocol::TMHello h;
-
- h.set_protoversion (BuildInfo::getCurrentProtocol().toPacked ());
- h.set_protoversionmin (BuildInfo::getMinimumProtocol().toPacked ());
- h.set_fullversion (BuildInfo::getFullVersionString ());
- h.set_nettime (getApp().getOPs ().getNetworkTimeNC ());
- h.set_nodepublic (getApp().getLocalCredentials ().getNodePublic ().humanNodePublic ());
- h.set_nodeproof (&vchSig[0], vchSig.size ());
- h.set_ipv4port (getConfig ().peerListeningPort);
- h.set_testnet (false);
-
- // We always advertise ourselves as private in the HELLO message. This
- // suppresses the old peer advertising code and allows PeerFinder to
- // take over the functionality.
- h.set_nodeprivate (true);
-
- Ledger::pointer closedLedger = getApp().getLedgerMaster ().getClosedLedger ();
-
- if (closedLedger && closedLedger->isClosed ())
- {
- uint256 hash = closedLedger->getHash ();
- h.set_ledgerclosed (hash.begin (), hash.size ());
- hash = closedLedger->getParentHash ();
- h.set_ledgerprevious (hash.begin (), hash.size ());
- }
-
- Message::pointer packet = std::make_shared (
- h, protocol::mtHELLO);
- sendPacket (packet, true);
-
- return true;
- }
-
- void recvHello (protocol::TMHello& packet)
- {
- bool bDetach = true;
-
- (void) m_timer.cancel ();
-
- std::uint32_t const ourTime (getApp().getOPs ().getNetworkTimeNC ());
- std::uint32_t const minTime (ourTime - clockToleranceDeltaSeconds);
- std::uint32_t const maxTime (ourTime + clockToleranceDeltaSeconds);
-
- #ifdef BEAST_DEBUG
- if (packet.has_nettime ())
- {
- std::int64_t to = ourTime;
- to -= packet.nettime ();
- m_journal.debug << "Connect: time offset " << to;
- }
-
- #endif
-
- BuildInfo::Protocol protocol (packet.protoversion());
-
- if (packet.has_nettime () &&
- ((packet.nettime () < minTime) || (packet.nettime () > maxTime)))
- {
- if (packet.nettime () > maxTime)
- {
- m_journal.info << "Hello: Clock for " << *this <<
- " is off by +" << packet.nettime () - ourTime;
- }
- else if (packet.nettime () < minTime)
- {
- m_journal.info << "Hello: Clock for " << *this <<
- " is off by -" << ourTime - packet.nettime ();
- }
- }
- else if (packet.protoversionmin () > BuildInfo::getCurrentProtocol().toPacked ())
- {
- std::string reqVersion (
- protocol.toStdString ());
-
- std::string curVersion (
- BuildInfo::getCurrentProtocol().toStdString ());
-
- m_journal.info << "Hello: Disconnect: Protocol mismatch [" <<
- "Peer expects " << reqVersion <<
- " and we run " << curVersion << "]";
- }
- else if (!m_nodePublicKey.setNodePublic (packet.nodepublic ()))
- {
- m_journal.info << "Hello: Disconnect: Bad node public key.";
- }
- else if (!m_nodePublicKey.verifyNodePublic (m_secureCookie, packet.nodeproof (), ECDSA::not_strict))
- {
- // Unable to verify they have private key for claimed public key.
- m_journal.info << "Hello: Disconnect: Failed to verify session.";
- }
- else
- {
- // Successful connection.
- m_journal.info << "Hello: Connect: " << m_nodePublicKey.humanNodePublic ();
-
- if ((protocol != BuildInfo::getCurrentProtocol()) &&
- m_journal.active(beast::Journal::Severity::kInfo))
- {
- m_journal.info << "Peer protocol: " << protocol.toStdString ();
- }
-
- mHello = packet;
-
- // Determine if this peer belongs to our cluster and get it's name
- m_clusterNode = getApp().getUNL().nodeInCluster (m_nodePublicKey, m_nodeName);
-
- if(m_clusterNode)
- m_journal.info << "Connected to cluster node " << m_nodeName;
-
- bassert (m_state == stateConnected);
- m_state = stateHandshaked;
-
- m_peerFinder.on_handshake (m_slot, RipplePublicKey(m_nodePublicKey),
- m_clusterNode);
-
- // XXX Set timer: connection is in grace period to be useful.
- // XXX Set timer: connection idle (idle may vary depending on connection type.)
- if ((mHello.has_ledgerclosed ()) && (mHello.ledgerclosed ().size () == (256 / 8)))
- {
- memcpy (m_closedLedgerHash.begin (), mHello.ledgerclosed ().data (), 256 / 8);
-
- if ((mHello.has_ledgerprevious ()) && (mHello.ledgerprevious ().size () == (256 / 8)))
- {
- memcpy (m_previousLedgerHash.begin (), mHello.ledgerprevious ().data (), 256 / 8);
- addLedger (m_previousLedgerHash);
- }
- else
- {
- m_previousLedgerHash.zero ();
- }
- }
-
- bDetach = false;
- }
-
- if (bDetach)
- {
- m_nodePublicKey.clear ();
- detach ("recvh");
- }
- else
- {
- sendGetPeers ();
- }
- }
-
- void recvCluster (protocol::TMCluster& packet)
- {
- if (!m_clusterNode)
- {
- charge (Resource::feeUnwantedData);
- return;
- }
-
- for (int i = 0; i < packet.clusternodes().size(); ++i)
- {
- protocol::TMClusterNode const& node = packet.clusternodes(i);
-
- std::string name;
- if (node.has_nodename())
- name = node.nodename();
- ClusterNodeStatus s(name, node.nodeload(), node.reporttime());
-
- RippleAddress nodePub;
- nodePub.setNodePublic(node.publickey());
-
- getApp().getUNL().nodeUpdate(nodePub, s);
- }
-
- int loadSources = packet.loadsources().size();
- if (loadSources != 0)
- {
- Resource::Gossip gossip;
- gossip.items.reserve (loadSources);
- for (int i = 0; i < packet.loadsources().size(); ++i)
- {
- protocol::TMLoadSource const& node = packet.loadsources (i);
- Resource::Gossip::Item item;
- item.address = beast::IP::Endpoint::from_string (node.name());
- item.balance = node.cost();
- if (item.address != beast::IP::Endpoint())
- gossip.items.push_back(item);
- }
- m_resourceManager.importConsumers (m_nodeName, gossip);
- }
-
- getApp().getFeeTrack().setClusterFee(getApp().getUNL().getClusterFee());
- }
-
-
- void recvTransaction (protocol::TMTransaction& packet)
- {
-
- Serializer s (packet.rawtransaction ());
-
- #ifndef TRUST_NETWORK
- try
- {
- #endif
- SerializerIterator sit (s);
- SerializedTransaction::pointer stx = std::make_shared (std::ref (sit));
- uint256 txID = stx->getTransactionID();
-
- int flags;
-
- if (! getApp().getHashRouter ().addSuppressionPeer (txID, m_shortId, flags))
- {
- // we have seen this transaction recently
- if (flags & SF_BAD)
- {
- charge (Resource::feeInvalidSignature);
- return;
- }
-
- if (!(flags & SF_RETRY))
- return;
- }
-
- m_journal.debug << "Got transaction from peer " << *this << ": " << txID;
-
- if (m_clusterNode)
- flags |= SF_TRUSTED | SF_SIGGOOD;
-
- if (getApp().getJobQueue().getJobCount(jtTRANSACTION) > 100)
- m_journal.info << "Transaction queue is full";
- else if (getApp().getLedgerMaster().getValidatedLedgerAge() > 240)
- m_journal.trace << "No new transactions until synchronized";
- else
- getApp().getJobQueue ().addJob (jtTRANSACTION,
- "recvTransaction->checkTransaction",
- std::bind (
- &PeerImp::checkTransaction, std::placeholders::_1,
- flags, stx,
- std::weak_ptr (shared_from_this ())));
-
- #ifndef TRUST_NETWORK
- }
- catch (...)
- {
- m_journal.warning << "Transaction invalid: " <<
- s.getHex();
- }
- #endif
- }
-
- void recvValidation (const std::shared_ptr& packet)
- {
- std::uint32_t closeTime = getApp().getOPs().getCloseTimeNC();
-
- if (packet->validation ().size () < 50)
- {
- m_journal.warning << "Too small validation from peer";
- charge (Resource::feeInvalidRequest);
- return;
- }
-
- #ifndef TRUST_NETWORK
-
- try
- #endif
- {
- Serializer s (packet->validation ());
- SerializerIterator sit (s);
- SerializedValidation::pointer val = std::make_shared (std::ref (sit), false);
-
- if (closeTime > (120 + val->getFieldU32(sfSigningTime)))
- {
- m_journal.trace << "Validation is more than two minutes old";
- charge (Resource::feeUnwantedData);
- return;
- }
-
- if (! getApp().getHashRouter ().addSuppressionPeer (s.getSHA512Half(), m_shortId))
- {
- m_journal.trace << "Validation is duplicate";
- return;
- }
-
- bool isTrusted = getApp().getUNL ().nodeInUNL (val->getSignerPublic ());
- if (isTrusted || !getApp().getFeeTrack ().isLoadedLocal ())
- {
- getApp().getJobQueue ().addJob (
- isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
- "recvValidation->checkValidation",
- std::bind (
- &PeerImp::checkValidation, std::placeholders::_1,
- &m_overlay, val, isTrusted, m_clusterNode, packet,
- std::weak_ptr (shared_from_this ())));
- }
- else
- m_journal.debug << "Dropping UNTRUSTED validation due to load";
- }
-
- #ifndef TRUST_NETWORK
- catch (...)
- {
- m_journal.warning << "Exception processing validation";
- charge (Resource::feeInvalidRequest);
- }
-
- #endif
- }
-
- void recvGetValidation (protocol::TMGetValidations& packet)
- {
- }
-
- void recvContact (protocol::TMContact& packet)
- {
- }
-
- void recvGetContacts (protocol::TMGetContacts& packet)
- {
- }
-
- // Return a list of your favorite people
- // TODO: filter out all the LAN peers
- void recvGetPeers (protocol::TMGetPeers& packet)
- {
-#if 0
- protocol::TMPeers peers;
-
- // CODEME: This is deprecated because of PeerFinder, but populate the
- // response with some data here anyways, and send if non-empty.
-
- sendPacket (
- std::make_shared (peers, protocol::mtPEERS),
- true);
-#endif
- }
-
- // TODO: filter out all the LAN peers
- void recvPeers (protocol::TMPeers& packet)
- {
- std::vector list;
- list.reserve (packet.nodes().size());
- for (int i = 0; i < packet.nodes ().size (); ++i)
- {
- in_addr addr;
-
- addr.s_addr = packet.nodes (i).ipv4 ();
-
- {
- beast::IP::AddressV4 v4 (ntohl (addr.s_addr));
- beast::IP::Endpoint address (v4, packet.nodes (i).ipv4port ());
- list.push_back (address);
- }
- }
-
- if (! list.empty())
- m_peerFinder.on_legacy_endpoints (list);
- }
-
- void recvEndpoints (protocol::TMEndpoints& packet)
- {
- std::vector endpoints;
-
- endpoints.reserve (packet.endpoints().size());
-
- for (int i = 0; i < packet.endpoints ().size (); ++i)
- {
- PeerFinder::Endpoint endpoint;
- protocol::TMEndpoint const& tm (packet.endpoints(i));
-
- // hops
- endpoint.hops = tm.hops();
-
- // ipv4
- if (endpoint.hops > 0)
- {
- in_addr addr;
- addr.s_addr = tm.ipv4().ipv4();
- beast::IP::AddressV4 v4 (ntohl (addr.s_addr));
- endpoint.address = beast::IP::Endpoint (v4, tm.ipv4().ipv4port ());
- }
- else
- {
- // This Endpoint describes the peer we are connected to.
- // We will take the remote address seen on the socket and
- // store that in the IP::Endpoint. If this is the first time,
- // then we'll verify that their listener can receive incoming
- // by performing a connectivity test.
- //
- endpoint.address = m_remoteAddress.at_port (
- tm.ipv4().ipv4port ());
- }
-
- endpoints.push_back (endpoint);
- }
-
- if (! endpoints.empty())
- m_peerFinder.on_endpoints (m_slot, endpoints);
- }
-
- void recvGetObjectByHash (const std::shared_ptr& ptr)
- {
- protocol::TMGetObjectByHash& packet = *ptr;
-
- if (packet.query ())
- {
- // this is a query
- if (packet.type () == protocol::TMGetObjectByHash::otFETCH_PACK)
- {
- doFetchPack (ptr);
- return;
- }
-
- protocol::TMGetObjectByHash reply;
-
- reply.set_query (false);
-
- if (packet.has_seq ())
- reply.set_seq (packet.seq ());
-
- reply.set_type (packet.type ());
-
- if (packet.has_ledgerhash ())
- reply.set_ledgerhash (packet.ledgerhash ());
-
- // This is a very minimal implementation
- for (int i = 0; i < packet.objects_size (); ++i)
- {
- uint256 hash;
- const protocol::TMIndexedObject& obj = packet.objects (i);
-
- if (obj.has_hash () && (obj.hash ().size () == (256 / 8)))
- {
- memcpy (hash.begin (), obj.hash ().data (), 256 / 8);
- // VFALCO TODO Move this someplace more sensible so we dont
- // need to inject the NodeStore interfaces.
- NodeObject::pointer hObj = getApp().getNodeStore ().fetch (hash);
-
- if (hObj)
- {
- protocol::TMIndexedObject& newObj = *reply.add_objects ();
- newObj.set_hash (hash.begin (), hash.size ());
- newObj.set_data (&hObj->getData ().front (), hObj->getData ().size ());
-
- if (obj.has_nodeid ())
- newObj.set_index (obj.nodeid ());
-
- if (!reply.has_seq () && (hObj->getLedgerIndex () != 0))
- reply.set_seq (hObj->getLedgerIndex ());
- }
- }
- }
-
- m_journal.trace << "GetObjByHash had " << reply.objects_size () <<
- " of " << packet.objects_size () <<
- " for " << to_string (this);
- sendPacket (
- std::make_shared (reply, protocol::mtGET_OBJECTS),
- true);
- }
- else
- {
- // this is a reply
- std::uint32_t pLSeq = 0;
- bool pLDo = true;
- bool progress = false;
-
- for (int i = 0; i < packet.objects_size (); ++i)
- {
- const protocol::TMIndexedObject& obj = packet.objects (i);
-
- if (obj.has_hash () && (obj.hash ().size () == (256 / 8)))
- {
-
- if (obj.has_ledgerseq ())
- {
- if (obj.ledgerseq () != pLSeq)
- {
- if ((pLDo && (pLSeq != 0)) &&
- m_journal.active(beast::Journal::Severity::kDebug))
- m_journal.debug << "Received full fetch pack for " << pLSeq;
-
- pLSeq = obj.ledgerseq ();
- pLDo = !getApp().getOPs ().haveLedger (pLSeq);
-
- if (!pLDo)
- m_journal.debug << "Got pack for " << pLSeq << " too late";
- else
- progress = true;
- }
- }
-
- if (pLDo)
- {
- uint256 hash;
- memcpy (hash.begin (), obj.hash ().data (), 256 / 8);
-
- std::shared_ptr< Blob > data (
- std::make_shared< Blob > (
- obj.data ().begin (), obj.data ().end ()));
-
- getApp().getOPs ().addFetchPack (hash, data);
- }
- }
- }
-
- if ((pLDo && (pLSeq != 0)) &&
- m_journal.active(beast::Journal::Severity::kDebug))
- m_journal.debug << "Received partial fetch pack for " << pLSeq;
-
- if (packet.type () == protocol::TMGetObjectByHash::otFETCH_PACK)
- getApp().getOPs ().gotFetchPack (progress, pLSeq);
- }
- }
-
- void recvPing (protocol::TMPing& packet)
- {
- if (packet.type () == protocol::TMPing::ptPING)
- {
- packet.set_type (protocol::TMPing::ptPONG);
- sendPacket (std::make_shared (packet, protocol::mtPING), true);
- }
- }
-
- void recvErrorMessage (protocol::TMErrorMsg& packet)
- {
- }
-
- void recvSearchTransaction (protocol::TMSearchTransaction& packet)
- {
- }
-
- void recvGetAccount (protocol::TMGetAccount& packet)
- {
- }
-
- void recvAccount (protocol::TMAccount& packet)
- {
- }
-
- void recvGetLedger (std::shared_ptr const& packet)
- {
- getApp().getJobQueue().addJob (jtPACK, "recvGetLedger",
- std::bind (&sGetLedger, std::weak_ptr (shared_from_this ()), packet));
- }
-
- /** A peer has sent us transaction set data */
- static void peerTXData (Job&,
- std::weak_ptr wPeer,
- uint256 const& hash,
- std::shared_ptr pPacket,
- beast::Journal journal)
- {
- std::shared_ptr peer = wPeer.lock ();
- if (!peer)
- return;
-
- protocol::TMLedgerData& packet = *pPacket;
-
- std::list nodeIDs;
- std::list< Blob > nodeData;
- for (int i = 0; i < packet.nodes ().size (); ++i)
- {
- const protocol::TMLedgerNode& node = packet.nodes (i);
-
- if (!node.has_nodeid () || !node.has_nodedata () || (node.nodeid ().size () != 33))
- {
- journal.warning << "LedgerData request with invalid node ID";
- peer->charge (Resource::feeInvalidRequest);
- return;
- }
-
- nodeIDs.push_back (SHAMapNode (node.nodeid ().data (), node.nodeid ().size ()));
- nodeData.push_back (Blob (node.nodedata ().begin (), node.nodedata ().end ()));
- }
-
- SHAMapAddNode san;
- {
- Application::ScopedLockType lock (getApp ().getMasterLock ());
-
- san = getApp().getOPs().gotTXData (peer, hash, nodeIDs, nodeData);
- }
-
- if (san.isInvalid ())
- {
- peer->charge (Resource::feeUnwantedData);
- }
-
- }
-
- void recvLedger (std::shared_ptr const& packet_ptr)
- {
- protocol::TMLedgerData& packet = *packet_ptr;
-
- if (packet.nodes ().size () <= 0)
- {
- m_journal.warning << "Ledger/TXset data with no nodes";
- return;
- }
-
- if (packet.has_requestcookie ())
- {
- Peer::ptr target = m_overlay.findPeerByShortID (packet.requestcookie ());
-
- if (target)
- {
- packet.clear_requestcookie ();
- target->sendPacket (std::make_shared (packet, protocol::mtLEDGER_DATA), false);
- }
- else
- {
- m_journal.info << "Unable to route TX/ledger data reply";
- charge (Resource::feeUnwantedData);
- }
-
- return;
- }
-
- uint256 hash;
-
- if (packet.ledgerhash ().size () != 32)
- {
- m_journal.warning << "TX candidate reply with invalid hash size";
- charge (Resource::feeInvalidRequest);
- return;
- }
-
- memcpy (hash.begin (), packet.ledgerhash ().data (), 32);
-
- if (packet.type () == protocol::liTS_CANDIDATE)
- {
- // got data for a candidate transaction set
-
- getApp().getJobQueue().addJob (jtTXN_DATA, "recvPeerData",
- std::bind (&PeerImp::peerTXData, std::placeholders::_1,
- std::weak_ptr (shared_from_this ()),
- hash, packet_ptr, m_journal));
-
- return;
- }
-
- if (!getApp().getInboundLedgers ().gotLedgerData (hash, shared_from_this(), packet_ptr))
- {
- WriteLog (lsTRACE, Peer) << "Got data for unwanted ledger";
- charge (Resource::feeUnwantedData);
- }
- }
-
- void recvStatus (protocol::TMStatusChange& packet)
- {
- m_journal.trace << "Received status change from peer " <<
- to_string (this);
-
- if (!packet.has_networktime ())
- packet.set_networktime (getApp().getOPs ().getNetworkTimeNC ());
-
- if (!mLastStatus.has_newstatus () || packet.has_newstatus ())
- mLastStatus = packet;
- else
- {
- // preserve old status
- protocol::NodeStatus status = mLastStatus.newstatus ();
- mLastStatus = packet;
- packet.set_newstatus (status);
- }
-
- if (packet.newevent () == protocol::neLOST_SYNC)
- {
- if (!m_closedLedgerHash.isZero ())
- {
- m_journal.trace << "peer has lost sync " << to_string (this);
- m_closedLedgerHash.zero ();
- }
-
- m_previousLedgerHash.zero ();
- return;
- }
-
- if (packet.has_ledgerhash () && (packet.ledgerhash ().size () == (256 / 8)))
- {
- // a peer has changed ledgers
- memcpy (m_closedLedgerHash.begin (), packet.ledgerhash ().data (), 256 / 8);
- addLedger (m_closedLedgerHash);
- m_journal.trace << "peer LCL is " << m_closedLedgerHash <<
- " " << to_string (this);
- }
- else
- {
- m_journal.trace << "peer has no ledger hash" << to_string (this);
- m_closedLedgerHash.zero ();
- }
-
- if (packet.has_ledgerhashprevious () && packet.ledgerhashprevious ().size () == (256 / 8))
- {
- memcpy (m_previousLedgerHash.begin (), packet.ledgerhashprevious ().data (), 256 / 8);
- addLedger (m_previousLedgerHash);
- }
- else m_previousLedgerHash.zero ();
-
- if (packet.has_firstseq () && packet.has_lastseq())
- {
- m_minLedger = packet.firstseq ();
- m_maxLedger = packet.lastseq ();
-
- // Work around some servers that report sequences incorrectly
- if (m_minLedger == 0)
- m_maxLedger = 0;
- if (m_maxLedger == 0)
- m_minLedger = 0;
- }
- }
-
- void recvPropose (const std::shared_ptr& packet)
- {
- assert (packet);
- protocol::TMProposeSet& set = *packet;
-
- if ((set.closetime() + 180) < getApp().getOPs().getCloseTimeNC())
- return;
-
- if ((set.currenttxhash ().size () != 32) || (set.nodepubkey ().size () < 28) ||
- (set.signature ().size () < 56) || (set.nodepubkey ().size () > 128) || (set.signature ().size () > 128))
- {
- m_journal.warning << "Received proposal is malformed";
- charge (Resource::feeInvalidSignature);
- return;
- }
-
- if (set.has_previousledger () && (set.previousledger ().size () != 32))
- {
- m_journal.warning << "Received proposal is malformed";
- charge (Resource::feeInvalidRequest);
- return;
- }
-
- uint256 proposeHash, prevLedger;
- memcpy (proposeHash.begin (), set.currenttxhash ().data (), 32);
-
- if (set.has_previousledger ())
- memcpy (prevLedger.begin (), set.previousledger ().data (), 32);
-
- uint256 suppression = LedgerProposal::computeSuppressionID (proposeHash, prevLedger,
- set.proposeseq(), set.closetime (),
- Blob(set.nodepubkey ().begin (), set.nodepubkey ().end ()),
- Blob(set.signature ().begin (), set.signature ().end ()));
-
- if (! getApp().getHashRouter ().addSuppressionPeer (suppression, m_shortId))
- {
- m_journal.trace << "Received duplicate proposal from peer " << m_shortId;
- return;
- }
-
- RippleAddress signerPublic = RippleAddress::createNodePublic (strCopy (set.nodepubkey ()));
-
- if (signerPublic == getConfig ().VALIDATION_PUB)
- {
- m_journal.trace << "Received our own proposal from peer " << m_shortId;
- return;
- }
-
- bool isTrusted = getApp().getUNL ().nodeInUNL (signerPublic);
- if (!isTrusted && getApp().getFeeTrack ().isLoadedLocal ())
- {
- m_journal.debug << "Dropping UNTRUSTED proposal due to load";
- return;
- }
-
- m_journal.trace << "Received " << (isTrusted ? "trusted" : "UNTRUSTED") <<
- " proposal from " << m_shortId;
-
- uint256 consensusLCL;
-
- {
- Application::ScopedLockType lock (getApp ().getMasterLock ());
- consensusLCL = getApp().getOPs ().getConsensusLCL ();
- }
-
- LedgerProposal::pointer proposal = std::make_shared (
- prevLedger.isNonZero () ? prevLedger : consensusLCL,
- set.proposeseq (), proposeHash, set.closetime (), signerPublic, suppression);
-
- getApp().getJobQueue ().addJob (isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
- "recvPropose->checkPropose", std::bind (
- &PeerImp::checkPropose, std::placeholders::_1, &m_overlay,
- packet, proposal, consensusLCL, m_nodePublicKey,
- std::weak_ptr (shared_from_this ()), m_clusterNode));
- }
-
- void recvHaveTxSet (protocol::TMHaveTransactionSet& packet)
- {
- uint256 hashes;
-
- if (packet.hash ().size () != (256 / 8))
- {
- charge (Resource::feeInvalidRequest);
- return;
- }
-
- uint256 hash;
-
- // VFALCO TODO There should be no use of memcpy() throughout the program.
- // TODO Clean up this magic number
- //
- memcpy (hash.begin (), packet.hash ().data (), 32);
-
- if (packet.status () == protocol::tsHAVE)
- addTxSet (hash);
-
- {
- Application::ScopedLockType lock (getApp ().getMasterLock ());
-
- if (!getApp().getOPs ().hasTXSet (shared_from_this (), hash, packet.status ()))
- charge (Resource::feeUnwantedData);
- }
- }
-
- void recvProofWork (protocol::TMProofWork& packet)
- {
- if (packet.has_response ())
- {
- // this is an answer to a proof of work we requested
- if (packet.response ().size () != (256 / 8))
- {
- charge (Resource::feeInvalidRequest);
- return;
- }
-
- uint256 response;
- memcpy (response.begin (), packet.response ().data (), 256 / 8);
-
- // VFALCO TODO Use a dependency injection here
- PowResult r = getApp().getProofOfWorkFactory ().checkProof (packet.token (), response);
-
- if (r == powOK)
- {
- // credit peer
- // WRITEME
- return;
- }
-
- // return error message
- // WRITEME
- if (r != powTOOEASY)
- {
- charge (Resource::feeBadProofOfWork);
- }
-
- return;
- }
-
- if (packet.has_result ())
- {
- // this is a reply to a proof of work we sent
- // WRITEME
- }
-
- if (packet.has_target () && packet.has_challenge () && packet.has_iterations ())
- {
- // this is a challenge
- // WRITEME: Reject from inbound connections
-
- uint256 challenge, target;
-
- if ((packet.challenge ().size () != (256 / 8)) || (packet.target ().size () != (256 / 8)))
- {
- charge (Resource::feeInvalidRequest);
- return;
- }
-
- memcpy (challenge.begin (), packet.challenge ().data (), 256 / 8);
- memcpy (target.begin (), packet.target ().data (), 256 / 8);
- ProofOfWork::pointer pow = std::make_shared (packet.token (), packet.iterations (),
- challenge, target);
-
- if (!pow->isValid ())
- {
- charge (Resource::feeInvalidRequest);
- return;
- }
-
- #if 0 // Until proof of work is completed, don't do it
- getApp().getJobQueue ().addJob (
- jtPROOFWORK,
- "recvProof->doProof",
- std::bind (&PeerImp::doProofOfWork, std::placeholders::_1,
- std::weak_ptr (shared_from_this ()), pow));
- #endif
-
- return;
- }
-
- WriteLog (lsINFO, Peer) << "Received in valid proof of work object from peer";
- }
-
- void addLedger (uint256 const& hash)
- {
- std::lock_guard sl(m_recentLock);
- BOOST_FOREACH (uint256 const & ledger, m_recentLedgers)
-
- if (ledger == hash)
- return;
-
- if (m_recentLedgers.size () == 128)
- m_recentLedgers.pop_front ();
-
- m_recentLedgers.push_back (hash);
- }
-
- void getLedger (protocol::TMGetLedger& packet)
- {
- SHAMap::pointer map;
- protocol::TMLedgerData reply;
- bool fatLeaves = true, fatRoot = false;
-
- if (packet.has_requestcookie ())
- reply.set_requestcookie (packet.requestcookie ());
-
- std::string logMe;
-
- if (packet.itype () == protocol::liTS_CANDIDATE)
- {
- // Request is for a transaction candidate set
- m_journal.trace << "Received request for TX candidate set data "
- << to_string (this);
-
- if ((!packet.has_ledgerhash () || packet.ledgerhash ().size () != 32))
- {
- charge (Resource::feeInvalidRequest);
- m_journal.warning << "invalid request for TX candidate set data";
- return;
- }
-
- uint256 txHash;
- memcpy (txHash.begin (), packet.ledgerhash ().data (), 32);
-
- {
- Application::ScopedLockType lock (getApp ().getMasterLock ());
- map = getApp().getOPs ().getTXMap (txHash);
- }
-
- if (!map)
- {
- if (packet.has_querytype () && !packet.has_requestcookie ())
- {
- m_journal.debug << "Trying to route TX set request";
-
- struct get_usable_peers
- {
- typedef Overlay::PeerSequence return_type;
-
- Overlay::PeerSequence usablePeers;
- uint256 const& txHash;
- Peer const* skip;
-
- get_usable_peers(uint256 const& hash, Peer const* s)
- : txHash(hash), skip(s)
- { }
-
- void operator() (Peer::ptr const& peer)
- {
- if (peer->hasTxSet (txHash) && (peer.get () != skip))
- usablePeers.push_back (peer);
- }
-
- return_type operator() ()
- {
- return usablePeers;
- }
- };
-
- Overlay::PeerSequence usablePeers (m_overlay.foreach (
- get_usable_peers (txHash, this)));
-
- if (usablePeers.empty ())
- {
- m_journal.info << "Unable to route TX set request";
- return;
- }
-
- Peer::ptr const& selectedPeer = usablePeers[rand () % usablePeers.size ()];
- packet.set_requestcookie (getShortId ());
- selectedPeer->sendPacket (
- std::make_shared (packet, protocol::mtGET_LEDGER),
- false);
- return;
- }
-
- m_journal.error << "We do not have the map our peer wants "
- << to_string (this);
-
- charge (Resource::feeInvalidRequest);
- return;
- }
-
- reply.set_ledgerseq (0);
- reply.set_ledgerhash (txHash.begin (), txHash.size ());
- reply.set_type (protocol::liTS_CANDIDATE);
- fatLeaves = false; // We'll already have most transactions
- fatRoot = true; // Save a pass
- }
- else
- {
- if (getApp().getFeeTrack().isLoadedLocal() && !m_clusterNode)
- {
- m_journal.debug << "Too busy to fetch ledger data";
- return;
- }
-
- // Figure out what ledger they want
- m_journal.trace << "Received request for ledger data "
- << to_string (this);
- Ledger::pointer ledger;
-
- if (packet.has_ledgerhash ())
- {
- uint256 ledgerhash;
-
- if (packet.ledgerhash ().size () != 32)
- {
- charge (Resource::feeInvalidRequest);
- m_journal.warning << "Invalid request";
- return;
- }
-
- memcpy (ledgerhash.begin (), packet.ledgerhash ().data (), 32);
- logMe += "LedgerHash:";
- logMe += to_string (ledgerhash);
- ledger = getApp().getLedgerMaster ().getLedgerByHash (ledgerhash);
-
- if (!ledger && m_journal.trace)
- m_journal.trace << "Don't have ledger " << ledgerhash;
-
- if (!ledger && (packet.has_querytype () && !packet.has_requestcookie ()))
- {
- std::uint32_t seq = 0;
-
- if (packet.has_ledgerseq ())
- seq = packet.ledgerseq ();
-
- Overlay::PeerSequence peerList = m_overlay.getActivePeers ();
- Overlay::PeerSequence usablePeers;
- BOOST_FOREACH (Peer::ptr const& peer, peerList)
- {
- if (peer->hasLedger (ledgerhash, seq) && (peer.get () != this))
- usablePeers.push_back (peer);
- }
-
- if (usablePeers.empty ())
- {
- m_journal.trace << "Unable to route ledger request";
- return;
- }
-
- Peer::ptr const& selectedPeer = usablePeers[rand () % usablePeers.size ()];
- packet.set_requestcookie (getShortId ());
- selectedPeer->sendPacket (
- std::make_shared (packet, protocol::mtGET_LEDGER), false);
- m_journal.debug << "Ledger request routed";
- return;
- }
- }
- else if (packet.has_ledgerseq ())
- {
- if (packet.ledgerseq() < getApp().getLedgerMaster().getEarliestFetch())
- {
- m_journal.debug << "Peer requests early ledger";
- return;
- }
- ledger = getApp().getLedgerMaster ().getLedgerBySeq (packet.ledgerseq ());
- if (!ledger && m_journal.debug)
- m_journal.debug << "Don't have ledger " << packet.ledgerseq ();
- }
- else if (packet.has_ltype () && (packet.ltype () == protocol::ltCURRENT))
- {
- ledger = getApp().getLedgerMaster ().getCurrentLedger ();
- }
- else if (packet.has_ltype () && (packet.ltype () == protocol::ltCLOSED) )
- {
- ledger = getApp().getLedgerMaster ().getClosedLedger ();
-
- if (ledger && !ledger->isClosed ())
- ledger = getApp().getLedgerMaster ().getLedgerBySeq (ledger->getLedgerSeq () - 1);
- }
- else
- {
- charge (Resource::feeInvalidRequest);
- m_journal.warning << "Can't figure out what ledger they want";
- return;
- }
-
- if ((!ledger) || (packet.has_ledgerseq () && (packet.ledgerseq () != ledger->getLedgerSeq ())))
- {
- charge (Resource::feeInvalidRequest);
-
- if (m_journal.warning && ledger)
- m_journal.warning << "Ledger has wrong sequence";
-
- return;
- }
-
- if (!packet.has_ledgerseq() && (ledger->getLedgerSeq() < getApp().getLedgerMaster().getEarliestFetch()))
- {
- m_journal.debug << "Peer requests early ledger";
- return;
- }
-
- // Fill out the reply
- uint256 lHash = ledger->getHash ();
- reply.set_ledgerhash (lHash.begin (), lHash.size ());
- reply.set_ledgerseq (ledger->getLedgerSeq ());
- reply.set_type (packet.itype ());
-
- if (packet.itype () == protocol::liBASE)
- {
- // they want the ledger base data
- m_journal.trace << "They want ledger base data";
- Serializer nData (128);
- ledger->addRaw (nData);
- reply.add_nodes ()->set_nodedata (nData.getDataPtr (), nData.getLength ());
-
- SHAMap::pointer map = ledger->peekAccountStateMap ();
-
- if (map && map->getHash ().isNonZero ())
- {
- // return account state root node if possible
- Serializer rootNode (768);
-
- if (map->getRootNode (rootNode, snfWIRE))
- {
- reply.add_nodes ()->set_nodedata (rootNode.getDataPtr (), rootNode.getLength ());
-
- if (ledger->getTransHash ().isNonZero ())
- {
- map = ledger->peekTransactionMap ();
-
- if (map && map->getHash ().isNonZero ())
- {
- rootNode.erase ();
-
- if (map->getRootNode (rootNode, snfWIRE))
- reply.add_nodes ()->set_nodedata (rootNode.getDataPtr (), rootNode.getLength ());
- }
- }
- }
- }
-
- Message::pointer oPacket = std::make_shared (reply, protocol::mtLEDGER_DATA);
- sendPacket (oPacket, false);
- return;
- }
-
- if (packet.itype () == protocol::liTX_NODE)
- {
- map = ledger->peekTransactionMap ();
- logMe += " TX:";
- logMe += to_string (map->getHash ());
- }
- else if (packet.itype () == protocol::liAS_NODE)
- {
- map = ledger->peekAccountStateMap ();
- logMe += " AS:";
- logMe += to_string (map->getHash ());
- }
- }
-
- if (!map || (packet.nodeids_size () == 0))
- {
- m_journal.warning << "Can't find map or empty request";
- charge (Resource::feeInvalidRequest);
- return;
- }
-
- m_journal.trace << "Request: " << logMe;
-
- for (int i = 0; i < packet.nodeids ().size (); ++i)
- {
- SHAMapNode mn (packet.nodeids (i).data (), packet.nodeids (i).size ());
-
- if (!mn.isValid ())
- {
- m_journal.warning << "Request for invalid node: " << logMe;
- charge (Resource::feeInvalidRequest);
- return;
- }
-
- std::vector nodeIDs;
- std::list< Blob > rawNodes;
-
- try
- {
- if (map->getNodeFat (mn, nodeIDs, rawNodes, fatRoot, fatLeaves))
- {
- assert (nodeIDs.size () == rawNodes.size ());
- m_journal.trace << "getNodeFat got " << rawNodes.size () << " nodes";
- std::vector::iterator nodeIDIterator;
- std::list< Blob >::iterator rawNodeIterator;
-
- for (nodeIDIterator = nodeIDs.begin (), rawNodeIterator = rawNodes.begin ();
- nodeIDIterator != nodeIDs.end (); ++nodeIDIterator, ++rawNodeIterator)
- {
- Serializer nID (33);
- nodeIDIterator->addIDRaw (nID);
- protocol::TMLedgerNode* node = reply.add_nodes ();
- node->set_nodeid (nID.getDataPtr (), nID.getLength ());
- node->set_nodedata (&rawNodeIterator->front (), rawNodeIterator->size ());
- }
- }
- else
- m_journal.warning << "getNodeFat returns false";
- }
- catch (std::exception&)
- {
- std::string info;
-
- if (packet.itype () == protocol::liTS_CANDIDATE)
- info = "TS candidate";
- else if (packet.itype () == protocol::liBASE)
- info = "Ledger base";
- else if (packet.itype () == protocol::liTX_NODE)
- info = "TX node";
- else if (packet.itype () == protocol::liAS_NODE)
- info = "AS node";
-
- if (!packet.has_ledgerhash ())
- info += ", no hash specified";
-
- m_journal.warning << "getNodeFat( " << mn << ") throws exception: " << info;
- }
- }
-
- Message::pointer oPacket = std::make_shared (reply, protocol::mtLEDGER_DATA);
- sendPacket (oPacket, false);
- }
-
- // This is dispatched by the job queue
- static
- void
- sGetLedger (std::weak_ptr wPeer,
- std::shared_ptr packet)
- {
- std::shared_ptr peer = wPeer.lock ();
-
- if (peer)
- peer->getLedger (*packet);
- }
-
- void addTxSet (uint256 const& hash)
- {
- std::lock_guard sl(m_recentLock);
-
- if(std::find (m_recentTxSets.begin (), m_recentTxSets.end (), hash) != m_recentTxSets.end ())
- return;
-
- if (m_recentTxSets.size () == 128)
- m_recentTxSets.pop_front ();
-
- m_recentTxSets.push_back (hash);
- }
-
- void doFetchPack (const std::shared_ptr& packet)
- {
- // VFALCO TODO Invert this dependency using an observer and shared state object.
- // Don't queue fetch pack jobs if we're under load or we already have
- // some queued.
- if (getApp().getFeeTrack ().isLoadedLocal () ||
- (getApp().getLedgerMaster().getValidatedLedgerAge() > 40) ||
- (getApp().getJobQueue().getJobCount(jtPACK) > 10))
- {
- m_journal.info << "Too busy to make fetch pack";
- return;
- }
-
- if (packet->ledgerhash ().size () != 32)
- {
- m_journal.warning << "FetchPack hash size malformed";
- charge (Resource::feeInvalidRequest);
- return;
- }
-
- uint256 hash;
- memcpy (hash.begin (), packet->ledgerhash ().data (), 32);
-
- getApp().getJobQueue ().addJob (jtPACK, "MakeFetchPack",
- std::bind (&NetworkOPs::makeFetchPack, &getApp().getOPs (), std::placeholders::_1,
- std::weak_ptr (shared_from_this ()), packet,
- hash, UptimeTimer::getInstance ().getElapsedSeconds ()));
- }
-
- void doProofOfWork (Job&, std::weak_ptr peer, ProofOfWork::pointer pow)
- {
- if (peer.expired ())
- return;
-
- uint256 solution = pow->solve ();
-
- if (solution.isZero ())
- {
- m_journal.warning << "Failed to solve proof of work";
- }
- else
- {
- Peer::ptr pptr (peer.lock ());
-
- if (pptr)
- {
- protocol::TMProofWork reply;
- reply.set_token (pow->getToken ());
- reply.set_response (solution.begin (), solution.size ());
- pptr->sendPacket (std::make_shared (reply, protocol::mtPROOFOFWORK), false);
- }
- else
- {
- // WRITEME: Save solved proof of work for new connection
- }
- }
- }
-
- static void checkTransaction (Job&, int flags, SerializedTransaction::pointer stx, std::weak_ptr peer)
- {
- #ifndef TRUST_NETWORK
- try
- {
- #endif
-
- if (stx->isFieldPresent(sfLastLedgerSequence) &&
- (stx->getFieldU32 (sfLastLedgerSequence) <
- getApp().getLedgerMaster().getValidLedgerIndex()))
- { // Transaction has expired
- getApp().getHashRouter().setFlag(stx->getTransactionID(), SF_BAD);
- charge (peer, Resource::feeUnwantedData);
- return;
- }
-
- bool const needCheck = !(flags & SF_SIGGOOD);
- Transaction::pointer tx =
- std::make_shared (stx, needCheck);
-
- if (tx->getStatus () == INVALID)
- {
- getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_BAD);
- charge (peer, Resource::feeInvalidSignature);
- return;
- }
- else
- getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_SIGGOOD);
-
- bool const trusted (flags & SF_TRUSTED);
- getApp().getOPs ().processTransaction (tx, trusted, false, false);
-
- #ifndef TRUST_NETWORK
- }
- catch (...)
- {
- getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_BAD);
- charge (peer, Resource::feeInvalidRequest);
- }
-
- #endif
- }
-
- // Called from our JobQueue
- static void checkPropose (Job& job, Overlay* pPeers, std::shared_ptr packet,
- LedgerProposal::pointer proposal, uint256 consensusLCL, RippleAddress nodePublic,
- std::weak_ptr peer, bool fromCluster)
- {
- bool sigGood = false;
- bool isTrusted = (job.getType () == jtPROPOSAL_t);
-
- WriteLog (lsTRACE, Peer) << "Checking " <<
- (isTrusted ? "trusted" : "UNTRUSTED") <<
- " proposal";
-
- assert (packet);
- protocol::TMProposeSet& set = *packet;
-
- uint256 prevLedger;
-
- if (set.has_previousledger ())
- {
- // proposal includes a previous ledger
- WriteLog(lsTRACE, Peer) << "proposal with previous ledger";
- memcpy (prevLedger.begin (), set.previousledger ().data (), 256 / 8);
-
- if (!fromCluster && !proposal->checkSign (set.signature ()))
- {
- Peer::ptr p = peer.lock ();
- WriteLog(lsWARNING, Peer) << "proposal with previous ledger fails sig check: " <<
- *p;
- charge (peer, Resource::feeInvalidSignature);
- return;
- }
- else
- sigGood = true;
- }
- else
- {
- if (consensusLCL.isNonZero () && proposal->checkSign (set.signature ()))
- {
- prevLedger = consensusLCL;
- sigGood = true;
- }
- else
- {
- // Could be mismatched prev ledger
- WriteLog(lsWARNING, Peer) << "Ledger proposal fails signature check";
- proposal->setSignature (set.signature ());
- }
- }
-
- if (isTrusted)
- {
- getApp().getOPs ().processTrustedProposal (proposal, packet, nodePublic, prevLedger, sigGood);
- }
- else if (sigGood && (prevLedger == consensusLCL))
- {
- // relay untrusted proposal
- WriteLog(lsTRACE, Peer) << "relaying UNTRUSTED proposal";
- std::set peers;
-
- if (getApp().getHashRouter ().swapSet (
- proposal->getSuppressionID (), peers, SF_RELAYED))
- {
- pPeers->foreach (send_if_not (
- std::make_shared (set, protocol::mtPROPOSE_LEDGER),
- peer_in_set(peers)));
- }
- }
- else
- {
- WriteLog(lsDEBUG, Peer) << "Not relaying UNTRUSTED proposal";
- }
- }
-
- static void checkValidation (Job&, Overlay* pPeers, SerializedValidation::pointer val, bool isTrusted, bool isCluster,
- std::shared_ptr packet, std::weak_ptr peer)
- {
- #ifndef TRUST_NETWORK
-
- try
- #endif
- {
- uint256 signingHash = val->getSigningHash();
- if (!isCluster && !val->isValid (signingHash))
- {
- WriteLog(lsWARNING, Peer) << "Validation is invalid";
- charge (peer, Resource::feeInvalidRequest);
- return;
- }
-
- std::string source;
- Peer::ptr lp = peer.lock ();
-
- if (lp)
- source = to_string(*lp);
- else
- source = "unknown";
-
- std::set peers;
-
- //----------------------------------------------------------------------
- //
- {
- SerializedValidation const& sv (*val);
- Validators::ReceivedValidation rv;
- rv.ledgerHash = sv.getLedgerHash ();
- rv.publicKey = sv.getSignerPublic();
- getApp ().getValidators ().receiveValidation (rv);
- }
- //
- //----------------------------------------------------------------------
-
- if (getApp().getOPs ().recvValidation (val, source) &&
- getApp().getHashRouter ().swapSet (signingHash, peers, SF_RELAYED))
- {
- pPeers->foreach (send_if_not (
- std::make_shared (*packet, protocol::mtVALIDATION),
- peer_in_set(peers)));
- }
- }
-
- #ifndef TRUST_NETWORK
- catch (...)
- {
- WriteLog(lsTRACE, Peer) << "Exception processing validation";
- charge (peer, Resource::feeInvalidRequest);
- }
- #endif
- }
-};
-
-//------------------------------------------------------------------------------
-
-const boost::posix_time::seconds PeerImp::nodeVerifySeconds (15);
-
-//------------------------------------------------------------------------------
-
-// to_string should not be used we should just use lexical_cast maybe
-
-inline std::string to_string (PeerImp const& peer)
-{
- if (peer.isInCluster())
- return peer.getClusterNodeName();
-
- return peer.getRemoteAddress().to_string();
-}
-
-inline std::string to_string (PeerImp const* peer)
-{
- return to_string (*peer);
-}
-
-inline std::ostream& operator<< (std::ostream& os, PeerImp const& peer)
-{
- os << to_string (peer);
-
- return os;
-}
-
-inline std::ostream& operator<< (std::ostream& os, PeerImp const* peer)
-{
- os << to_string (peer);
-
- return os;
-}
-
-//------------------------------------------------------------------------------
-
-inline std::string to_string (Peer const& peer)
-{
- if (peer.isInCluster())
- return peer.getClusterNodeName();
-
- return peer.getRemoteAddress().to_string();
-}
-
-inline std::string to_string (Peer const* peer)
-{
- return to_string (*peer);
-}
-
-inline std::ostream& operator<< (std::ostream& os, Peer const& peer)
-{
- os << to_string (peer);
-
- return os;
-}
-
-inline std::ostream& operator<< (std::ostream& os, Peer const* peer)
-{
- os << to_string (peer);
-
- return os;
-}
-
-}
-
-#endif
diff --git a/src/ripple/module/rpc/handlers/Connect.cpp b/src/ripple/module/rpc/handlers/Connect.cpp
index ee1f64ae24b..7aa182c9d01 100644
--- a/src/ripple/module/rpc/handlers/Connect.cpp
+++ b/src/ripple/module/rpc/handlers/Connect.cpp
@@ -17,7 +17,7 @@
*/
//==============================================================================
-#include
+#include
namespace ripple {
diff --git a/src/ripple/module/rpc/handlers/Peers.cpp b/src/ripple/module/rpc/handlers/Peers.cpp
index 4e8708dc33e..fe818e4e82e 100644
--- a/src/ripple/module/rpc/handlers/Peers.cpp
+++ b/src/ripple/module/rpc/handlers/Peers.cpp
@@ -17,7 +17,7 @@
*/
//==============================================================================
-#include
+#include
namespace ripple {
diff --git a/src/ripple/module/overlay/api/Message.h b/src/ripple/overlay/Message.h
similarity index 100%
rename from src/ripple/module/overlay/api/Message.h
rename to src/ripple/overlay/Message.h
diff --git a/src/ripple/module/overlay/api/Overlay.h b/src/ripple/overlay/Overlay.h
similarity index 76%
rename from src/ripple/module/overlay/api/Overlay.h
rename to src/ripple/overlay/Overlay.h
index ee378d02224..24b8056da6e 100644
--- a/src/ripple/module/overlay/api/Overlay.h
+++ b/src/ripple/overlay/Overlay.h
@@ -20,7 +20,7 @@
#ifndef RIPPLE_OVERLAY_OVERLAY_H_INCLUDED
#define RIPPLE_OVERLAY_OVERLAY_H_INCLUDED
-#include
+#include
// VFALCO TODO Remove this include dependency it shouldn't be needed
#include
@@ -52,27 +52,43 @@ class Overlay
public:
typedef std::vector PeerSequence;
- virtual ~Overlay () = default;
+ virtual
+ ~Overlay () = default;
- // VFALCO NOTE These should be a private API
- /** @{ */
- virtual void remove (PeerFinder::Slot::ptr const& slot) = 0;
- /** @} */
-
- virtual void connect (beast::IP::Endpoint const& address) = 0;
+ /** Establish a peer connection to the specified endpoint.
+ The call returns immediately, the connection attempt is
+ performed asynchronously.
+ */
+ virtual
+ void
+ connect (beast::IP::Endpoint const& address) = 0;
- // Notification that a peer has connected.
- virtual void onPeerActivated (Peer::ptr const& peer) = 0;
+ /** Returns the number of active peers.
+ Active peers are only those peers that have completed the
+ handshake and are using the peer protocol.
+ */
+ virtual
+ std::size_t
+ size () = 0;
- // Notification that a peer has disconnected.
- virtual void onPeerDisconnect (Peer::ptr const& peer) = 0;
+ /** Return diagnostics on the status of all peers.
+ @deprecated This is superceded by PropertyStream
+ */
+ virtual
+ Json::Value
+ json () = 0;
- virtual std::size_t size () = 0;
- virtual Json::Value json () = 0;
- virtual PeerSequence getActivePeers () = 0;
+ /** Returns a sequence representing the current list of peers.
+ The snapshot is made at the time of the call.
+ */
+ virtual
+ PeerSequence
+ getActivePeers () = 0;
- // Peer 64-bit ID function
- virtual Peer::ptr findPeerByShortID (Peer::ShortId const& id) = 0;
+ /** Returns the peer with the matching short id, or null. */
+ virtual
+ Peer::ptr
+ findPeerByShortID (Peer::ShortId const& id) = 0;
/** Visit every active peer and return a value
The functor must:
@@ -96,10 +112,8 @@ class Overlay
foreach(Function f)
{
PeerSequence peers (getActivePeers());
-
for(PeerSequence::const_iterator i = peers.begin(); i != peers.end(); ++i)
f (*i);
-
return f();
}
diff --git a/src/ripple/module/overlay/api/Peer.h b/src/ripple/overlay/Peer.h
similarity index 95%
rename from src/ripple/module/overlay/api/Peer.h
rename to src/ripple/overlay/Peer.h
index d8c86a25852..2d85756de21 100644
--- a/src/ripple/module/overlay/api/Peer.h
+++ b/src/ripple/overlay/Peer.h
@@ -20,7 +20,7 @@
#ifndef RIPPLE_OVERLAY_PEER_H_INCLUDED
#define RIPPLE_OVERLAY_PEER_H_INCLUDED
-#include
+#include
#include
#include
@@ -47,7 +47,7 @@ class Peer
// Network
//
- virtual void sendPacket (const Message::pointer& packet, bool onStrand) = 0;
+ virtual void send (Message::pointer const& m) = 0;
virtual beast::IP::Endpoint getRemoteAddress() const = 0;
/** Adjust this peer's load balance based on the type of load imposed. */
diff --git a/src/ripple/overlay/README.md b/src/ripple/overlay/README.md
new file mode 100644
index 00000000000..ff0e0ce2125
--- /dev/null
+++ b/src/ripple/overlay/README.md
@@ -0,0 +1,25 @@
+# Overlay
+
+## Introduction
+
+The _Ripple payment network_ consists of a collection of _peers_ running
+**rippled**. Each peer maintains multiple outgoing connections and optional
+incoming connections to other peers. These connections are made over both
+the public Internet and private local area networks. This network defines a
+fully connected directed graph of nodes where vertices are instances of rippled
+and edges are persistent TCP/IP connections. Peers send and receive messages to
+other connected peers. This peer to peer network, layered on top of the public
+and private Internet, forms an [_overlay network_][overlay_network]. The
+contents of the messages and the behavior of peers in response to the messages,
+plus the information exchanged during the handshaking phase of connection
+establishment, defines the _Ripple peer protocol_ (_protocol_ in this context).
+
+## Overview
+
+Each connection is represented by a _Peer_ object. The Overlay manager
+establishes, receives, and maintains connections to peers. Protocol
+messages are exchanged between peers and serialized using
+[_Google Protocol Buffers_][protocol_buffers].
+
+[overlay_network]: http://en.wikipedia.org/wiki/Overlay_network
+[protocol_buffers]: https://developers.google.com/protocol-buffers/
diff --git a/src/ripple/module/overlay/TODO.md b/src/ripple/overlay/TODO.md
similarity index 100%
rename from src/ripple/module/overlay/TODO.md
rename to src/ripple/overlay/TODO.md
diff --git a/src/ripple/module/overlay/impl/Message.cpp b/src/ripple/overlay/impl/Message.cpp
similarity index 98%
rename from src/ripple/module/overlay/impl/Message.cpp
rename to src/ripple/overlay/impl/Message.cpp
index 10df27ccbe6..90e40bce824 100644
--- a/src/ripple/module/overlay/impl/Message.cpp
+++ b/src/ripple/overlay/impl/Message.cpp
@@ -17,7 +17,7 @@
*/
//==============================================================================
-#include
+#include
#include
diff --git a/src/ripple/module/overlay/impl/MessageStream.h b/src/ripple/overlay/impl/MessageStream.h
similarity index 100%
rename from src/ripple/module/overlay/impl/MessageStream.h
rename to src/ripple/overlay/impl/MessageStream.h
diff --git a/src/ripple/module/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp
similarity index 98%
rename from src/ripple/module/overlay/impl/OverlayImpl.cpp
rename to src/ripple/overlay/impl/OverlayImpl.cpp
index 09eda877dd6..403fe70d436 100644
--- a/src/ripple/module/overlay/impl/OverlayImpl.cpp
+++ b/src/ripple/overlay/impl/OverlayImpl.cpp
@@ -17,12 +17,16 @@
*/
//==============================================================================
-#include
-#include
-#include
+#include
+#include
+#include
#include
+#if DOXYGEN
+#include
+#endif
+
namespace ripple {
SETUP_LOG (Peer)
@@ -306,7 +310,7 @@ OverlayImpl::send (PeerFinder::Slot::ptr const& slot,
assert (iter != m_peers.end ());
PeerImp::ptr const peer (iter->second.lock());
assert (peer != nullptr);
- peer->sendPacket (msg, false);
+ peer->send (msg);
}
}
diff --git a/src/ripple/module/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h
similarity index 92%
rename from src/ripple/module/overlay/impl/OverlayImpl.h
rename to src/ripple/overlay/impl/OverlayImpl.h
index 52c16164e47..adee62a8b29 100644
--- a/src/ripple/module/overlay/impl/OverlayImpl.h
+++ b/src/ripple/overlay/impl/OverlayImpl.h
@@ -20,7 +20,7 @@
#ifndef RIPPLE_OVERLAY_OVERLAYIMPL_H_INCLUDED
#define RIPPLE_OVERLAY_OVERLAYIMPL_H_INCLUDED
-#include
+#include
#include
#include
@@ -47,7 +47,7 @@ class OverlayImpl
: public Overlay
, public PeerFinder::Callback
{
-public:
+private:
typedef boost::asio::ip::tcp::socket socket_type;
typedef std::unordered_map
-#include
+#include
+#include
#include
namespace ripple {
diff --git a/src/ripple/module/overlay/impl/PeerDoor.h b/src/ripple/overlay/impl/PeerDoor.h
similarity index 96%
rename from src/ripple/module/overlay/impl/PeerDoor.h
rename to src/ripple/overlay/impl/PeerDoor.h
index 5ca48abc530..27abeef9195 100644
--- a/src/ripple/module/overlay/impl/PeerDoor.h
+++ b/src/ripple/overlay/impl/PeerDoor.h
@@ -20,7 +20,7 @@
#ifndef RIPPLE_PEERDOOR_H_INCLUDED
#define RIPPLE_PEERDOOR_H_INCLUDED
-#include
+#include
#include //
diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp
new file mode 100644
index 00000000000..0334ddbe20d
--- /dev/null
+++ b/src/ripple/overlay/impl/PeerImp.cpp
@@ -0,0 +1,1332 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of rippled: https://github.com/ripple/rippled
+ Copyright (c) 2012, 2013 Ripple Labs Inc.
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+#include
+#include
+#include
+
+namespace ripple {
+
+//------------------------------------------------------------------------------
+
+/** A peer has sent us transaction set data */
+// VFALCO TODO Make this non-static
+static void peerTXData (Job&,
+ std::weak_ptr wPeer,
+ uint256 const& hash,
+ std::shared_ptr pPacket,
+ beast::Journal journal)
+{
+ std::shared_ptr peer = wPeer.lock ();
+ if (!peer)
+ return;
+
+ protocol::TMLedgerData& packet = *pPacket;
+
+ std::list nodeIDs;
+ std::list< Blob > nodeData;
+ for (int i = 0; i < packet.nodes ().size (); ++i)
+ {
+ const protocol::TMLedgerNode& node = packet.nodes (i);
+
+ if (!node.has_nodeid () || !node.has_nodedata () || (node.nodeid ().size () != 33))
+ {
+ journal.warning << "LedgerData request with invalid node ID";
+ peer->charge (Resource::feeInvalidRequest);
+ return;
+ }
+
+ nodeIDs.push_back (SHAMapNode (node.nodeid ().data (), node.nodeid ().size ()));
+ nodeData.push_back (Blob (node.nodedata ().begin (), node.nodedata ().end ()));
+ }
+
+ SHAMapAddNode san;
+ {
+ Application::ScopedLockType lock (getApp ().getMasterLock ());
+
+ san = getApp().getOPs().gotTXData (peer, hash, nodeIDs, nodeData);
+ }
+
+ if (san.isInvalid ())
+ {
+ peer->charge (Resource::feeUnwantedData);
+ }
+}
+
+// VFALCO NOTE This function is way too big and cumbersome.
+void
+PeerImp::getLedger (protocol::TMGetLedger& packet)
+{
+ SHAMap::pointer map;
+ protocol::TMLedgerData reply;
+ bool fatLeaves = true, fatRoot = false;
+
+ if (packet.has_requestcookie ())
+ reply.set_requestcookie (packet.requestcookie ());
+
+ std::string logMe;
+
+ if (packet.itype () == protocol::liTS_CANDIDATE)
+ {
+ // Request is for a transaction candidate set
+ m_journal.trace << "Received request for TX candidate set data "
+ << to_string (this);
+
+ if ((!packet.has_ledgerhash () || packet.ledgerhash ().size () != 32))
+ {
+ charge (Resource::feeInvalidRequest);
+ m_journal.warning << "invalid request for TX candidate set data";
+ return;
+ }
+
+ uint256 txHash;
+ memcpy (txHash.begin (), packet.ledgerhash ().data (), 32);
+
+ {
+ Application::ScopedLockType lock (getApp ().getMasterLock ());
+ map = getApp().getOPs ().getTXMap (txHash);
+ }
+
+ if (!map)
+ {
+ if (packet.has_querytype () && !packet.has_requestcookie ())
+ {
+ m_journal.debug << "Trying to route TX set request";
+
+ struct get_usable_peers
+ {
+ typedef Overlay::PeerSequence return_type;
+
+ Overlay::PeerSequence usablePeers;
+ uint256 const& txHash;
+ Peer const* skip;
+
+ get_usable_peers(uint256 const& hash, Peer const* s)
+ : txHash(hash), skip(s)
+ { }
+
+ void operator() (Peer::ptr const& peer)
+ {
+ if (peer->hasTxSet (txHash) && (peer.get () != skip))
+ usablePeers.push_back (peer);
+ }
+
+ return_type operator() ()
+ {
+ return usablePeers;
+ }
+ };
+
+ Overlay::PeerSequence usablePeers (m_overlay.foreach (
+ get_usable_peers (txHash, this)));
+
+ if (usablePeers.empty ())
+ {
+ m_journal.info << "Unable to route TX set request";
+ return;
+ }
+
+ Peer::ptr const& selectedPeer = usablePeers[rand () % usablePeers.size ()];
+ packet.set_requestcookie (getShortId ());
+ selectedPeer->send (
+ std::make_shared (packet, protocol::mtGET_LEDGER));
+ return;
+ }
+
+ m_journal.error << "We do not have the map our peer wants "
+ << to_string (this);
+
+ charge (Resource::feeInvalidRequest);
+ return;
+ }
+
+ reply.set_ledgerseq (0);
+ reply.set_ledgerhash (txHash.begin (), txHash.size ());
+ reply.set_type (protocol::liTS_CANDIDATE);
+ fatLeaves = false; // We'll already have most transactions
+ fatRoot = true; // Save a pass
+ }
+ else
+ {
+ if (getApp().getFeeTrack().isLoadedLocal() && !m_clusterNode)
+ {
+ m_journal.debug << "Too busy to fetch ledger data";
+ return;
+ }
+
+ // Figure out what ledger they want
+ m_journal.trace << "Received request for ledger data "
+ << to_string (this);
+ Ledger::pointer ledger;
+
+ if (packet.has_ledgerhash ())
+ {
+ uint256 ledgerhash;
+
+ if (packet.ledgerhash ().size () != 32)
+ {
+ charge (Resource::feeInvalidRequest);
+ m_journal.warning << "Invalid request";
+ return;
+ }
+
+ memcpy (ledgerhash.begin (), packet.ledgerhash ().data (), 32);
+ logMe += "LedgerHash:";
+ logMe += to_string (ledgerhash);
+ ledger = getApp().getLedgerMaster ().getLedgerByHash (ledgerhash);
+
+ if (!ledger && m_journal.trace)
+ m_journal.trace << "Don't have ledger " << ledgerhash;
+
+ if (!ledger && (packet.has_querytype () && !packet.has_requestcookie ()))
+ {
+ std::uint32_t seq = 0;
+
+ if (packet.has_ledgerseq ())
+ seq = packet.ledgerseq ();
+
+ Overlay::PeerSequence peerList = m_overlay.getActivePeers ();
+ Overlay::PeerSequence usablePeers;
+ BOOST_FOREACH (Peer::ptr const& peer, peerList)
+ {
+ if (peer->hasLedger (ledgerhash, seq) && (peer.get () != this))
+ usablePeers.push_back (peer);
+ }
+
+ if (usablePeers.empty ())
+ {
+ m_journal.trace << "Unable to route ledger request";
+ return;
+ }
+
+ Peer::ptr const& selectedPeer = usablePeers[rand () % usablePeers.size ()];
+ packet.set_requestcookie (getShortId ());
+ selectedPeer->send (
+ std::make_shared (packet, protocol::mtGET_LEDGER));
+ m_journal.debug << "Ledger request routed";
+ return;
+ }
+ }
+ else if (packet.has_ledgerseq ())
+ {
+ if (packet.ledgerseq() < getApp().getLedgerMaster().getEarliestFetch())
+ {
+ m_journal.debug << "Peer requests early ledger";
+ return;
+ }
+ ledger = getApp().getLedgerMaster ().getLedgerBySeq (packet.ledgerseq ());
+ if (!ledger && m_journal.debug)
+ m_journal.debug << "Don't have ledger " << packet.ledgerseq ();
+ }
+ else if (packet.has_ltype () && (packet.ltype () == protocol::ltCURRENT))
+ {
+ ledger = getApp().getLedgerMaster ().getCurrentLedger ();
+ }
+ else if (packet.has_ltype () && (packet.ltype () == protocol::ltCLOSED) )
+ {
+ ledger = getApp().getLedgerMaster ().getClosedLedger ();
+
+ if (ledger && !ledger->isClosed ())
+ ledger = getApp().getLedgerMaster ().getLedgerBySeq (ledger->getLedgerSeq () - 1);
+ }
+ else
+ {
+ charge (Resource::feeInvalidRequest);
+ m_journal.warning << "Can't figure out what ledger they want";
+ return;
+ }
+
+ if ((!ledger) || (packet.has_ledgerseq () && (packet.ledgerseq () != ledger->getLedgerSeq ())))
+ {
+ charge (Resource::feeInvalidRequest);
+
+ if (m_journal.warning && ledger)
+ m_journal.warning << "Ledger has wrong sequence";
+
+ return;
+ }
+
+ if (!packet.has_ledgerseq() && (ledger->getLedgerSeq() < getApp().getLedgerMaster().getEarliestFetch()))
+ {
+ m_journal.debug << "Peer requests early ledger";
+ return;
+ }
+
+ // Fill out the reply
+ uint256 lHash = ledger->getHash ();
+ reply.set_ledgerhash (lHash.begin (), lHash.size ());
+ reply.set_ledgerseq (ledger->getLedgerSeq ());
+ reply.set_type (packet.itype ());
+
+ if (packet.itype () == protocol::liBASE)
+ {
+ // they want the ledger base data
+ m_journal.trace << "They want ledger base data";
+ Serializer nData (128);
+ ledger->addRaw (nData);
+ reply.add_nodes ()->set_nodedata (nData.getDataPtr (), nData.getLength ());
+
+ SHAMap::pointer map = ledger->peekAccountStateMap ();
+
+ if (map && map->getHash ().isNonZero ())
+ {
+ // return account state root node if possible
+ Serializer rootNode (768);
+
+ if (map->getRootNode (rootNode, snfWIRE))
+ {
+ reply.add_nodes ()->set_nodedata (rootNode.getDataPtr (), rootNode.getLength ());
+
+ if (ledger->getTransHash ().isNonZero ())
+ {
+ map = ledger->peekTransactionMap ();
+
+ if (map && map->getHash ().isNonZero ())
+ {
+ rootNode.erase ();
+
+ if (map->getRootNode (rootNode, snfWIRE))
+ reply.add_nodes ()->set_nodedata (rootNode.getDataPtr (), rootNode.getLength ());
+ }
+ }
+ }
+ }
+
+ Message::pointer oPacket = std::make_shared (reply, protocol::mtLEDGER_DATA);
+ send (oPacket);
+ return;
+ }
+
+ if (packet.itype () == protocol::liTX_NODE)
+ {
+ map = ledger->peekTransactionMap ();
+ logMe += " TX:";
+ logMe += to_string (map->getHash ());
+ }
+ else if (packet.itype () == protocol::liAS_NODE)
+ {
+ map = ledger->peekAccountStateMap ();
+ logMe += " AS:";
+ logMe += to_string (map->getHash ());
+ }
+ }
+
+ if (!map || (packet.nodeids_size () == 0))
+ {
+ m_journal.warning << "Can't find map or empty request";
+ charge (Resource::feeInvalidRequest);
+ return;
+ }
+
+ m_journal.trace << "Request: " << logMe;
+
+ for (int i = 0; i < packet.nodeids ().size (); ++i)
+ {
+ SHAMapNode mn (packet.nodeids (i).data (), packet.nodeids (i).size ());
+
+ if (!mn.isValid ())
+ {
+ m_journal.warning << "Request for invalid node: " << logMe;
+ charge (Resource::feeInvalidRequest);
+ return;
+ }
+
+ std::vector nodeIDs;
+ std::list< Blob > rawNodes;
+
+ try
+ {
+ if (map->getNodeFat (mn, nodeIDs, rawNodes, fatRoot, fatLeaves))
+ {
+ assert (nodeIDs.size () == rawNodes.size ());
+ m_journal.trace << "getNodeFat got " << rawNodes.size () << " nodes";
+ std::vector::iterator nodeIDIterator;
+ std::list< Blob >::iterator rawNodeIterator;
+
+ for (nodeIDIterator = nodeIDs.begin (), rawNodeIterator = rawNodes.begin ();
+ nodeIDIterator != nodeIDs.end (); ++nodeIDIterator, ++rawNodeIterator)
+ {
+ Serializer nID (33);
+ nodeIDIterator->addIDRaw (nID);
+ protocol::TMLedgerNode* node = reply.add_nodes ();
+ node->set_nodeid (nID.getDataPtr (), nID.getLength ());
+ node->set_nodedata (&rawNodeIterator->front (), rawNodeIterator->size ());
+ }
+ }
+ else
+ m_journal.warning << "getNodeFat returns false";
+ }
+ catch (std::exception&)
+ {
+ std::string info;
+
+ if (packet.itype () == protocol::liTS_CANDIDATE)
+ info = "TS candidate";
+ else if (packet.itype () == protocol::liBASE)
+ info = "Ledger base";
+ else if (packet.itype () == protocol::liTX_NODE)
+ info = "TX node";
+ else if (packet.itype () == protocol::liAS_NODE)
+ info = "AS node";
+
+ if (!packet.has_ledgerhash ())
+ info += ", no hash specified";
+
+ m_journal.warning << "getNodeFat( " << mn << ") throws exception: " << info;
+ }
+ }
+
+ Message::pointer oPacket = std::make_shared (reply, protocol::mtLEDGER_DATA);
+ send (oPacket);
+}
+
+// This is dispatched by the job queue
+static
+void
+sGetLedger (std::weak_ptr wPeer,
+ std::shared_ptr packet)
+{
+ std::shared_ptr peer = wPeer.lock ();
+
+ if (peer)
+ peer->getLedger (*packet);
+}
+
+//------------------------------------------------------------------------------
+
+void
+PeerImp::async_read_some()
+{
+ m_socket->async_read_some (read_buffer_.prepare (Tuning::readBufferBytes),
+ m_strand.wrap (std::bind (&PeerImp::on_read_some,
+ shared_from_this(), beast::asio::placeholders::error,
+ beast::asio::placeholders::bytes_transferred)));
+}
+
+void
+PeerImp::on_read_some (error_code ec, std::size_t bytes_transferred)
+{
+ if (m_detaching)
+ return;
+
+ if (ec == boost::asio::error::operation_aborted)
+ return;
+
+ if (! ec)
+ {
+ read_buffer_.commit (bytes_transferred);
+ ec = message_stream_.write_one (read_buffer_.data());
+ read_buffer_.consume (bytes_transferred);
+ }
+
+ if (ec)
+ {
+ m_journal.info <<
+ "on_read_some: " << ec.message();
+ detach("on_read_some");
+ return;
+ }
+
+ async_read_some();
+}
+
+//------------------------------------------------------------------------------
+//
+// abstract_protocol_handler
+//
+//------------------------------------------------------------------------------
+
+PeerImp::error_code
+PeerImp::on_message_unknown (std::uint16_t type)
+{
+ error_code ec;
+ // TODO
+ return ec;
+}
+
+PeerImp::error_code
+PeerImp::on_message_begin (std::uint16_t type,
+ std::shared_ptr <::google::protobuf::Message> const& m)
+{
+ error_code ec;
+
+#if 0
+ beast::debug_ostream log;
+ log << m->DebugString();
+#endif
+
+ if (type == protocol::mtHELLO && m_state != stateConnected)
+ {
+ m_journal.warning <<
+ "Unexpected TMHello";
+ ec = invalid_argument_error();
+ }
+ else if (type != protocol::mtHELLO && m_state == stateConnected)
+ {
+ m_journal.warning <<
+ "Expected TMHello";
+ ec = invalid_argument_error();
+ }
+
+ if (! ec)
+ {
+ load_event_ = getApp().getJobQueue ().getLoadEventAP (
+ jtPEER, protocol_message_name(type));
+ }
+
+ return ec;
+}
+
+void
+PeerImp::on_message_end (std::uint16_t,
+ std::shared_ptr <::google::protobuf::Message> const&)
+{
+ load_event_.reset();
+}
+
+PeerImp::error_code
+PeerImp::on_message (std::shared_ptr const& m)
+{
+ error_code ec;
+
+ bool bDetach (true);
+
+ m_timer.cancel ();
+
+ std::uint32_t const ourTime (getApp().getOPs ().getNetworkTimeNC ());
+ std::uint32_t const minTime (ourTime - clockToleranceDeltaSeconds);
+ std::uint32_t const maxTime (ourTime + clockToleranceDeltaSeconds);
+
+#ifdef BEAST_DEBUG
+ if (m->has_nettime ())
+ {
+ std::int64_t to = ourTime;
+ to -= m->nettime ();
+ m_journal.debug <<
+ "Connect: time offset " << to;
+ }
+#endif
+
+ BuildInfo::Protocol protocol (m->protoversion());
+
+ if (m->has_nettime () &&
+ ((m->nettime () < minTime) || (m->nettime () > maxTime)))
+ {
+ if (m->nettime () > maxTime)
+ {
+ m_journal.info <<
+ "Hello: Clock for " << *this <<
+ " is off by +" << m->nettime () - ourTime;
+ }
+ else if (m->nettime () < minTime)
+ {
+ m_journal.info <<
+ "Hello: Clock for " << *this <<
+ " is off by -" << ourTime - m->nettime ();
+ }
+ }
+ else if (m->protoversionmin () > BuildInfo::getCurrentProtocol().toPacked ())
+ {
+ std::string reqVersion (
+ protocol.toStdString ());
+
+ std::string curVersion (
+ BuildInfo::getCurrentProtocol().toStdString ());
+
+ m_journal.info <<
+ "Hello: Disconnect: Protocol mismatch [" <<
+ "Peer expects " << reqVersion <<
+ " and we run " << curVersion << "]";
+ }
+ else if (! m_nodePublicKey.setNodePublic (m->nodepublic ()))
+ {
+ m_journal.info <<
+ "Hello: Disconnect: Bad node public key.";
+ }
+ else if (! m_nodePublicKey.verifyNodePublic (
+ m_secureCookie, m->nodeproof (), ECDSA::not_strict))
+ {
+ // Unable to verify they have private key for claimed public key.
+ m_journal.info <<
+ "Hello: Disconnect: Failed to verify session.";
+ }
+ else
+ {
+ // Successful connection.
+ m_journal.info <<
+ "Hello: Connect: " << m_nodePublicKey.humanNodePublic ();
+
+ if ((protocol != BuildInfo::getCurrentProtocol()) &&
+ m_journal.active(beast::Journal::Severity::kInfo))
+ {
+ m_journal.info <<
+ "Peer protocol: " << protocol.toStdString ();
+ }
+
+ mHello = *m;
+
+ // Determine if this peer belongs to our cluster and get it's name
+ m_clusterNode = getApp().getUNL().nodeInCluster (
+ m_nodePublicKey, m_nodeName);
+
+ if (m_clusterNode)
+ m_journal.info <<
+ "Connected to cluster node " << m_nodeName;
+
+ assert (m_state == stateConnected);
+ m_state = stateHandshaked;
+
+ m_peerFinder.on_handshake (m_slot, RipplePublicKey (m_nodePublicKey),
+ m_clusterNode);
+
+ // XXX Set timer: connection is in grace period to be useful.
+ // XXX Set timer: connection idle (idle may vary depending on connection type.)
+ if ((mHello.has_ledgerclosed ()) && (
+ mHello.ledgerclosed ().size () == (256 / 8)))
+ {
+ memcpy (m_closedLedgerHash.begin (),
+ mHello.ledgerclosed ().data (), 256 / 8);
+
+ if ((mHello.has_ledgerprevious ()) &&
+ (mHello.ledgerprevious ().size () == (256 / 8)))
+ {
+ memcpy (m_previousLedgerHash.begin (),
+ mHello.ledgerprevious ().data (), 256 / 8);
+ addLedger (m_previousLedgerHash);
+ }
+ else
+ {
+ m_previousLedgerHash.zero ();
+ }
+ }
+
+ bDetach = false;
+ }
+
+ if (bDetach)
+ {
+ //m_nodePublicKey.clear ();
+ //detach ("recvh");
+
+ ec = invalid_argument_error();
+ }
+ else
+ {
+ sendGetPeers ();
+ }
+
+ return ec;
+}
+
+PeerImp::error_code
+PeerImp::on_message (std::shared_ptr const& m)
+{
+ error_code ec;
+ if (m->type () == protocol::TMPing::ptPING)
+ {
+ m->set_type (protocol::TMPing::ptPONG);
+ send (std::make_shared (*m, protocol::mtPING));
+ }
+ return ec;
+}
+
+PeerImp::error_code
+PeerImp::on_message (std::shared_ptr const& m)
+{
+ error_code ec;
+ if (m->has_response ())
+ {
+ // this is an answer to a proof of work we requested
+ if (m->response ().size () != (256 / 8))
+ {
+ charge (Resource::feeInvalidRequest);
+ return ec;
+ }
+
+ uint256 response;
+ memcpy (response.begin (), m->response ().data (), 256 / 8);
+
+ // VFALCO TODO Use a dependency injection here
+ PowResult r = getApp().getProofOfWorkFactory ().checkProof (m->token (), response);
+
+ if (r == powOK)
+ {
+ // credit peer
+ // WRITEME
+ return ec;
+ }
+
+ // return error message
+ // WRITEME
+ if (r != powTOOEASY)
+ {
+ charge (Resource::feeBadProofOfWork);
+ }
+
+ return ec;
+ }
+
+ if (m->has_result ())
+ {
+ // this is a reply to a proof of work we sent
+ // WRITEME
+ }
+
+ if (m->has_target () && m->has_challenge () && m->has_iterations ())
+ {
+ // this is a challenge
+ // WRITEME: Reject from inbound connections
+
+ uint256 challenge, target;
+
+ if ((m->challenge ().size () != (256 / 8)) || (m->target ().size () != (256 / 8)))
+ {
+ charge (Resource::feeInvalidRequest);
+ return ec;
+ }
+
+ memcpy (challenge.begin (), m->challenge ().data (), 256 / 8);
+ memcpy (target.begin (), m->target ().data (), 256 / 8);
+ ProofOfWork::pointer pow = std::make_shared (m->token (), m->iterations (),
+ challenge, target);
+
+ if (!pow->isValid ())
+ {
+ charge (Resource::feeInvalidRequest);
+ return ec;
+ }
+
+#if 0 // Until proof of work is completed, don't do it
+ getApp().getJobQueue ().addJob (
+ jtPROOFWORK,
+ "recvProof->doProof",
+ std::bind (&PeerImp::doProofOfWork, std::placeholders::_1,
+ std::weak_ptr (shared_from_this ()), pow));
+#endif
+
+ return ec;
+ }
+
+ m_journal.info << "Received in valid proof of work object from peer";
+
+ return ec;
+}
+
+PeerImp::error_code
+PeerImp::on_message (std::shared_ptr const& m)
+{
+ error_code ec;
+ if (!m_clusterNode)
+ {
+ charge (Resource::feeUnwantedData);
+ return ec;
+ }
+
+ for (int i = 0; i < m->clusternodes().size(); ++i)
+ {
+ protocol::TMClusterNode const& node = m->clusternodes(i);
+
+ std::string name;
+ if (node.has_nodename())
+ name = node.nodename();
+ ClusterNodeStatus s(name, node.nodeload(), node.reporttime());
+
+ RippleAddress nodePub;
+ nodePub.setNodePublic(node.publickey());
+
+ getApp().getUNL().nodeUpdate(nodePub, s);
+ }
+
+ int loadSources = m->loadsources().size();
+ if (loadSources != 0)
+ {
+ Resource::Gossip gossip;
+ gossip.items.reserve (loadSources);
+ for (int i = 0; i < m->loadsources().size(); ++i)
+ {
+ protocol::TMLoadSource const& node = m->loadsources (i);
+ Resource::Gossip::Item item;
+ item.address = beast::IP::Endpoint::from_string (node.name());
+ item.balance = node.cost();
+ if (item.address != beast::IP::Endpoint())
+ gossip.items.push_back(item);
+ }
+ m_resourceManager.importConsumers (m_nodeName, gossip);
+ }
+
+ getApp().getFeeTrack().setClusterFee(getApp().getUNL().getClusterFee());
+ return ec;
+}
+
+PeerImp::error_code
+PeerImp::on_message (std::shared_ptr const& m)
+{
+ error_code ec;
+ // VFALCO TODO This message is now obsolete due to PeerFinder
+ return ec;
+}
+
+PeerImp::error_code
+PeerImp::on_message (std::shared_ptr const& m)
+{
+ error_code ec;
+ // VFALCO TODO This message is now obsolete due to PeerFinder
+ std::vector list;
+ list.reserve (m->nodes().size());
+ for (int i = 0; i < m->nodes ().size (); ++i)
+ {
+ in_addr addr;
+
+ addr.s_addr = m->nodes (i).ipv4 ();
+
+ {
+ beast::IP::AddressV4 v4 (ntohl (addr.s_addr));
+ beast::IP::Endpoint address (v4, m->nodes (i).ipv4port ());
+ list.push_back (address);
+ }
+ }
+
+ if (! list.empty())
+ m_peerFinder.on_legacy_endpoints (list);
+ return ec;
+}
+
+PeerImp::error_code
+PeerImp::on_message (std::shared_ptr const& m)
+{
+ error_code ec;
+ std::vector endpoints;
+
+ endpoints.reserve (m->endpoints().size());
+
+ for (int i = 0; i < m->endpoints ().size (); ++i)
+ {
+ PeerFinder::Endpoint endpoint;
+ protocol::TMEndpoint const& tm (m->endpoints(i));
+
+ // hops
+ endpoint.hops = tm.hops();
+
+ // ipv4
+ if (endpoint.hops > 0)
+ {
+ in_addr addr;
+ addr.s_addr = tm.ipv4().ipv4();
+ beast::IP::AddressV4 v4 (ntohl (addr.s_addr));
+ endpoint.address = beast::IP::Endpoint (v4, tm.ipv4().ipv4port ());
+ }
+ else
+ {
+ // This Endpoint describes the peer we are connected to.
+ // We will take the remote address seen on the socket and
+ // store that in the IP::Endpoint. If this is the first time,
+ // then we'll verify that their listener can receive incoming
+ // by performing a connectivity test.
+ //
+ endpoint.address = m_remoteAddress.at_port (
+ tm.ipv4().ipv4port ());
+ }
+
+ endpoints.push_back (endpoint);
+ }
+
+ if (! endpoints.empty())
+ m_peerFinder.on_endpoints (m_slot, endpoints);
+ return ec;
+}
+
+PeerImp::error_code
+PeerImp::on_message (std::shared_ptr const& m)
+{
+ error_code ec;
+ Serializer s (m->rawtransaction ());
+
+ try
+ {
+ SerializerIterator sit (s);
+ SerializedTransaction::pointer stx = std::make_shared (std::ref (sit));
+ uint256 txID = stx->getTransactionID();
+
+ int flags;
+
+ if (! getApp().getHashRouter ().addSuppressionPeer (txID, m_shortId, flags))
+ {
+ // we have seen this transaction recently
+ if (flags & SF_BAD)
+ {
+ charge (Resource::feeInvalidSignature);
+ return ec;
+ }
+
+ if (!(flags & SF_RETRY))
+ return ec;
+ }
+
+ m_journal.debug << "Got transaction from peer " << *this << ": " << txID;
+
+ if (m_clusterNode)
+ flags |= SF_TRUSTED | SF_SIGGOOD;
+
+ if (getApp().getJobQueue().getJobCount(jtTRANSACTION) > 100)
+ m_journal.info << "Transaction queue is full";
+ else if (getApp().getLedgerMaster().getValidatedLedgerAge() > 240)
+ m_journal.trace << "No new transactions until synchronized";
+ else
+ getApp().getJobQueue ().addJob (jtTRANSACTION,
+ "recvTransaction->checkTransaction",
+ std::bind (
+ &PeerImp::checkTransaction, std::placeholders::_1,
+ flags, stx,
+ std::weak_ptr (shared_from_this ())));
+
+ }
+ catch (...)
+ {
+ m_journal.warning << "Transaction invalid: " <<
+ s.getHex();
+ }
+ return ec;
+}
+
+PeerImp::error_code
+PeerImp::on_message (std::shared_ptr const& m)
+{
+ error_code ec;
+ getApp().getJobQueue().addJob (jtPACK, "recvGetLedger",
+ std::bind (&sGetLedger, std::weak_ptr (shared_from_this ()), m));
+ return ec;
+}
+
+PeerImp::error_code
+PeerImp::on_message (std::shared_ptr const& m)
+{
+ error_code ec;
+ protocol::TMLedgerData& packet = *m;
+
+ if (m->nodes ().size () <= 0)
+ {
+ m_journal.warning << "Ledger/TXset data with no nodes";
+ return ec;
+ }
+
+ if (m->has_requestcookie ())
+ {
+ Peer::ptr target = m_overlay.findPeerByShortID (m->requestcookie ());
+
+ if (target)
+ {
+ m->clear_requestcookie ();
+ target->send (std::make_shared (packet, protocol::mtLEDGER_DATA));
+ }
+ else
+ {
+ m_journal.info << "Unable to route TX/ledger data reply";
+ charge (Resource::feeUnwantedData);
+ }
+
+ return ec;
+ }
+
+ uint256 hash;
+
+ if (m->ledgerhash ().size () != 32)
+ {
+ m_journal.warning << "TX candidate reply with invalid hash size";
+ charge (Resource::feeInvalidRequest);
+ return ec;
+ }
+
+ memcpy (hash.begin (), m->ledgerhash ().data (), 32);
+
+ if (m->type () == protocol::liTS_CANDIDATE)
+ {
+ // got data for a candidate transaction set
+
+ getApp().getJobQueue().addJob (jtTXN_DATA, "recvPeerData",
+ std::bind (&peerTXData, std::placeholders::_1,
+ std::weak_ptr (shared_from_this ()),
+ hash, m, m_journal));
+
+ return ec;
+ }
+
+ if (!getApp().getInboundLedgers ().gotLedgerData (hash, shared_from_this(), m))
+ {
+ m_journal.trace << "Got data for unwanted ledger";
+ charge (Resource::feeUnwantedData);
+ }
+ return ec;
+}
+
+PeerImp::error_code
+PeerImp::on_message (std::shared_ptr const& m)
+{
+ error_code ec;
+ protocol::TMProposeSet& set = *m;
+
+ // VFALCO Magic numbers are bad
+ if ((set.closetime() + 180) < getApp().getOPs().getCloseTimeNC())
+ return ec;
+
+ // VFALCO Magic numbers are bad
+ // Roll this into a validation function
+ if ((set.currenttxhash ().size () != 32) || (set.nodepubkey ().size () < 28) ||
+ (set.signature ().size () < 56) || (set.nodepubkey ().size () > 128) || (set.signature ().size () > 128))
+ {
+ m_journal.warning << "Received proposal is malformed";
+ charge (Resource::feeInvalidSignature);
+ return ec;
+ }
+
+ if (set.has_previousledger () && (set.previousledger ().size () != 32))
+ {
+ m_journal.warning << "Received proposal is malformed";
+ charge (Resource::feeInvalidRequest);
+ return ec;
+ }
+
+ uint256 proposeHash, prevLedger;
+ memcpy (proposeHash.begin (), set.currenttxhash ().data (), 32);
+
+ if (set.has_previousledger ())
+ memcpy (prevLedger.begin (), set.previousledger ().data (), 32);
+
+ uint256 suppression = LedgerProposal::computeSuppressionID (proposeHash, prevLedger,
+ set.proposeseq(), set.closetime (),
+ Blob(set.nodepubkey ().begin (), set.nodepubkey ().end ()),
+ Blob(set.signature ().begin (), set.signature ().end ()));
+
+ if (! getApp().getHashRouter ().addSuppressionPeer (suppression, m_shortId))
+ {
+ m_journal.trace << "Received duplicate proposal from peer " << m_shortId;
+ return ec;
+ }
+
+ RippleAddress signerPublic = RippleAddress::createNodePublic (strCopy (set.nodepubkey ()));
+
+ if (signerPublic == getConfig ().VALIDATION_PUB)
+ {
+ m_journal.trace << "Received our own proposal from peer " << m_shortId;
+ return ec;
+ }
+
+ bool isTrusted = getApp().getUNL ().nodeInUNL (signerPublic);
+ if (!isTrusted && getApp().getFeeTrack ().isLoadedLocal ())
+ {
+ m_journal.debug << "Dropping UNTRUSTED proposal due to load";
+ return ec;
+ }
+
+ m_journal.trace << "Received " << (isTrusted ? "trusted" : "UNTRUSTED") <<
+ " proposal from " << m_shortId;
+
+ uint256 consensusLCL;
+
+ {
+ Application::ScopedLockType lock (getApp ().getMasterLock ());
+ consensusLCL = getApp().getOPs ().getConsensusLCL ();
+ }
+
+ LedgerProposal::pointer proposal = std::make_shared (
+ prevLedger.isNonZero () ? prevLedger : consensusLCL,
+ set.proposeseq (), proposeHash, set.closetime (), signerPublic, suppression);
+
+ getApp().getJobQueue ().addJob (isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
+ "recvPropose->checkPropose", std::bind (
+ &PeerImp::checkPropose, std::placeholders::_1, &m_overlay,
+ m, proposal, consensusLCL, m_nodePublicKey,
+ std::weak_ptr (shared_from_this ()), m_clusterNode));
+ return ec;
+}
+
+PeerImp::error_code
+PeerImp::on_message (std::shared_ptr const& m)
+{
+ error_code ec;
+ m_journal.trace << "Received status change from peer " <<
+ to_string (this);
+
+ if (!m->has_networktime ())
+ m->set_networktime (getApp().getOPs ().getNetworkTimeNC ());
+
+ if (!mLastStatus.has_newstatus () || m->has_newstatus ())
+ mLastStatus = *m;
+ else
+ {
+ // preserve old status
+ protocol::NodeStatus status = mLastStatus.newstatus ();
+ mLastStatus = *m;
+ m->set_newstatus (status);
+ }
+
+ if (m->newevent () == protocol::neLOST_SYNC)
+ {
+ if (!m_closedLedgerHash.isZero ())
+ {
+ m_journal.trace << "peer has lost sync " << to_string (this);
+ m_closedLedgerHash.zero ();
+ }
+
+ m_previousLedgerHash.zero ();
+ return ec;
+ }
+
+ if (m->has_ledgerhash () && (m->ledgerhash ().size () == (256 / 8)))
+ {
+ // a peer has changed ledgers
+ memcpy (m_closedLedgerHash.begin (), m->ledgerhash ().data (), 256 / 8);
+ addLedger (m_closedLedgerHash);
+ m_journal.trace << "peer LCL is " << m_closedLedgerHash <<
+ " " << to_string (this);
+ }
+ else
+ {
+ m_journal.trace << "peer has no ledger hash" << to_string (this);
+ m_closedLedgerHash.zero ();
+ }
+
+ if (m->has_ledgerhashprevious () && m->ledgerhashprevious ().size () == (256 / 8))
+ {
+ memcpy (m_previousLedgerHash.begin (), m->ledgerhashprevious ().data (), 256 / 8);
+ addLedger (m_previousLedgerHash);
+ }
+ else m_previousLedgerHash.zero ();
+
+ if (m->has_firstseq () && m->has_lastseq())
+ {
+ m_minLedger = m->firstseq ();
+ m_maxLedger = m->lastseq ();
+
+ // Work around some servers that report sequences incorrectly
+ if (m_minLedger == 0)
+ m_maxLedger = 0;
+ if (m_maxLedger == 0)
+ m_minLedger = 0;
+ }
+ return ec;
+}
+
+PeerImp::error_code
+PeerImp::on_message (std::shared_ptr const& m)
+{
+ error_code ec;
+ uint256 hashes;
+
+ if (m->hash ().size () != (256 / 8))
+ {
+ charge (Resource::feeInvalidRequest);
+ return ec;
+ }
+
+ uint256 hash;
+
+ // VFALCO TODO There should be no use of memcpy() throughout the program.
+ // TODO Clean up this magic number
+ //
+ memcpy (hash.begin (), m->hash ().data (), 32);
+
+ if (m->status () == protocol::tsHAVE)
+ addTxSet (hash);
+
+ {
+ Application::ScopedLockType lock (getApp ().getMasterLock ());
+
+ if (!getApp().getOPs ().hasTXSet (shared_from_this (), hash, m->status ()))
+ charge (Resource::feeUnwantedData);
+ }
+ return ec;
+}
+
+PeerImp::error_code
+PeerImp::on_message (std::shared_ptr const& m)
+{
+ error_code ec;
+ std::uint32_t closeTime = getApp().getOPs().getCloseTimeNC();
+
+ if (m->validation ().size () < 50)
+ {
+ m_journal.warning << "Too small validation from peer";
+ charge (Resource::feeInvalidRequest);
+ return ec;
+ }
+
+ try
+ {
+ Serializer s (m->validation ());
+ SerializerIterator sit (s);
+ SerializedValidation::pointer val = std::make_shared (std::ref (sit), false);
+
+ if (closeTime > (120 + val->getFieldU32(sfSigningTime)))
+ {
+ m_journal.trace << "Validation is more than two minutes old";
+ charge (Resource::feeUnwantedData);
+ return ec;
+ }
+
+ if (! getApp().getHashRouter ().addSuppressionPeer (s.getSHA512Half(), m_shortId))
+ {
+ m_journal.trace << "Validation is duplicate";
+ return ec;
+ }
+
+ bool isTrusted = getApp().getUNL ().nodeInUNL (val->getSignerPublic ());
+ if (isTrusted || !getApp().getFeeTrack ().isLoadedLocal ())
+ {
+ getApp().getJobQueue ().addJob (
+ isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
+ "recvValidation->checkValidation",
+ std::bind (
+ &PeerImp::checkValidation, std::placeholders::_1,
+ &m_overlay, val, isTrusted, m_clusterNode, m,
+ std::weak_ptr (shared_from_this ())));
+ }
+ else
+ {
+ m_journal.debug <<
+ "Dropping UNTRUSTED validation due to load";
+ }
+ }
+ catch (...)
+ {
+ m_journal.warning <<
+ "Exception processing validation";
+ charge (Resource::feeInvalidRequest);
+ }
+
+ return ec;
+}
+
+PeerImp::error_code
+PeerImp::on_message (std::shared_ptr const& m)
+{
+ error_code ec;
+ protocol::TMGetObjectByHash& packet = *m;
+
+ if (packet.query ())
+ {
+ // this is a query
+ if (packet.type () == protocol::TMGetObjectByHash::otFETCH_PACK)
+ {
+ doFetchPack (m);
+ return ec;
+ }
+
+ protocol::TMGetObjectByHash reply;
+
+ reply.set_query (false);
+
+ if (packet.has_seq ())
+ reply.set_seq (packet.seq ());
+
+ reply.set_type (packet.type ());
+
+ if (packet.has_ledgerhash ())
+ reply.set_ledgerhash (packet.ledgerhash ());
+
+ // This is a very minimal implementation
+ for (int i = 0; i < packet.objects_size (); ++i)
+ {
+ uint256 hash;
+ const protocol::TMIndexedObject& obj = packet.objects (i);
+
+ if (obj.has_hash () && (obj.hash ().size () == (256 / 8)))
+ {
+ memcpy (hash.begin (), obj.hash ().data (), 256 / 8);
+ // VFALCO TODO Move this someplace more sensible so we dont
+ // need to inject the NodeStore interfaces.
+ NodeObject::pointer hObj = getApp().getNodeStore ().fetch (hash);
+
+ if (hObj)
+ {
+ protocol::TMIndexedObject& newObj = *reply.add_objects ();
+ newObj.set_hash (hash.begin (), hash.size ());
+ newObj.set_data (&hObj->getData ().front (), hObj->getData ().size ());
+
+ if (obj.has_nodeid ())
+ newObj.set_index (obj.nodeid ());
+
+ if (!reply.has_seq () && (hObj->getLedgerIndex () != 0))
+ reply.set_seq (hObj->getLedgerIndex ());
+ }
+ }
+ }
+
+ m_journal.trace << "GetObjByHash had " << reply.objects_size () <<
+ " of " << packet.objects_size () <<
+ " for " << to_string (this);
+ send (std::make_shared (reply, protocol::mtGET_OBJECTS));
+ }
+ else
+ {
+ // this is a reply
+ std::uint32_t pLSeq = 0;
+ bool pLDo = true;
+ bool progress = false;
+
+ for (int i = 0; i < packet.objects_size (); ++i)
+ {
+ const protocol::TMIndexedObject& obj = packet.objects (i);
+
+ if (obj.has_hash () && (obj.hash ().size () == (256 / 8)))
+ {
+ if (obj.has_ledgerseq ())
+ {
+ if (obj.ledgerseq () != pLSeq)
+ {
+ if ((pLDo && (pLSeq != 0)) &&
+ m_journal.active(beast::Journal::Severity::kDebug))
+ m_journal.debug << "Received full fetch pack for " << pLSeq;
+
+ pLSeq = obj.ledgerseq ();
+ pLDo = !getApp().getOPs ().haveLedger (pLSeq);
+
+ if (!pLDo)
+ m_journal.debug << "Got pack for " << pLSeq << " too late";
+ else
+ progress = true;
+ }
+ }
+
+ if (pLDo)
+ {
+ uint256 hash;
+ memcpy (hash.begin (), obj.hash ().data (), 256 / 8);
+
+ std::shared_ptr< Blob > data (
+ std::make_shared< Blob > (
+ obj.data ().begin (), obj.data ().end ()));
+
+ getApp().getOPs ().addFetchPack (hash, data);
+ }
+ }
+ }
+
+ if ((pLDo && (pLSeq != 0)) &&
+ m_journal.active(beast::Journal::Severity::kDebug))
+ m_journal.debug << "Received partial fetch pack for " << pLSeq;
+
+ if (packet.type () == protocol::TMGetObjectByHash::otFETCH_PACK)
+ getApp().getOPs ().gotFetchPack (progress, pLSeq);
+ }
+ return ec;
+}
+
+//------------------------------------------------------------------------------
+
+} // ripple
diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h
new file mode 100644
index 00000000000..1573fbd9497
--- /dev/null
+++ b/src/ripple/overlay/impl/PeerImp.h
@@ -0,0 +1,1308 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of rippled: https://github.com/ripple/rippled
+ Copyright (c) 2012, 2013 Ripple Labs Inc.
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+#ifndef RIPPLE_OVERLAY_PEERIMP_H_INCLUDED
+#define RIPPLE_OVERLAY_PEERIMP_H_INCLUDED
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+// VFALCO This is unfortunate. Comment this out and
+// just include what is needed.
+#include
+
+#include
+#include
+
+#include
+
+namespace ripple {
+
+typedef boost::asio::ip::tcp::socket NativeSocketType;
+
+class PeerImp;
+
+std::string to_string (Peer const& peer);
+std::ostream& operator<< (std::ostream& os, Peer const& peer);
+
+std::string to_string (Peer const* peer);
+std::ostream& operator<< (std::ostream& os, Peer const* peer);
+
+std::string to_string (PeerImp const& peer);
+std::ostream& operator<< (std::ostream& os, PeerImp const& peer);
+
+std::string to_string (PeerImp const* peer);
+std::ostream& operator<< (std::ostream& os, PeerImp const* peer);
+
+//------------------------------------------------------------------------------
+
+class PeerImp
+ : public Peer
+ , public std::enable_shared_from_this
+ , private beast::LeakChecked
+ , private abstract_protocol_handler
+{
+private:
+ /** Time alloted for a peer to send a HELLO message (DEPRECATED) */
+ static const boost::posix_time::seconds nodeVerifySeconds;
+
+ /** The clock drift we allow a remote peer to have */
+ static const std::uint32_t clockToleranceDeltaSeconds = 20;
+
+ /** The length of the smallest valid finished message */
+ static const size_t sslMinimumFinishedLength = 12;
+
+ //--------------------------------------------------------------------------
+ /** We have accepted an inbound connection.
+
+ The connection state transitions from `stateConnect` to `stateConnected`
+ as `stateConnect`.
+ */
+ void accept ()
+ {
+ m_journal.info << "Accepted " << m_remoteAddress;
+
+ m_socket->set_verify_mode (boost::asio::ssl::verify_none);
+ m_socket->async_handshake (
+ boost::asio::ssl::stream_base::server,
+ m_strand.wrap (std::bind (
+ &PeerImp::handleStart,
+ std::static_pointer_cast (shared_from_this ()),
+ beast::asio::placeholders::error)));
+ }
+
+ /** Attempt an outbound connection.
+
+ The connection may fail (for a number of reasons) and we do not know
+ what will happen at this point.
+
+ The connection state does not transition with this function and remains
+ as `stateConnecting`.
+ */
+ void connect ()
+ {
+ m_journal.info << "Connecting to " << m_remoteAddress;
+
+ boost::system::error_code err;
+
+ m_timer.expires_from_now (nodeVerifySeconds, err);
+
+ m_timer.async_wait (m_strand.wrap (std::bind (
+ &PeerImp::handleVerifyTimer,
+ shared_from_this (), beast::asio::placeholders::error)));
+
+ if (err)
+ {
+ m_journal.error << "Failed to set verify timer.";
+ detach ("c2");
+ return;
+ }
+
+ m_socket->next_layer ().async_connect (
+ beast::IPAddressConversion::to_asio_endpoint (m_remoteAddress),
+ m_strand.wrap (std::bind (&PeerImp::onConnect,
+ shared_from_this (), beast::asio::placeholders::error)));
+ }
+
+public:
+ /** Current state */
+ enum State
+ {
+ /** An connection is being established (outbound) */
+ stateConnecting
+
+ /** Connection has been successfully established */
+ ,stateConnected
+
+ /** Handshake has been received from this peer */
+ ,stateHandshaked
+
+ /** Running the Ripple protocol actively */
+ ,stateActive
+
+ /** Gracefully closing */
+ ,stateGracefulClose
+ };
+
+ typedef std::shared_ptr ptr;
+
+ NativeSocketType m_owned_socket;
+
+ beast::Journal m_journal;
+
+ // A unique identifier (up to a restart of rippled) for this particular
+ // peer instance. A peer that disconnects will, upon reconnection, get a
+ // new ID.
+ ShortId m_shortId;
+
+ // Updated at each stage of the connection process to reflect
+ // the current conditions as closely as possible. This includes
+ // the case where we learn the true IP via a PROXY handshake.
+ beast::IP::Endpoint m_remoteAddress;
+
+ // These is up here to prevent warnings about order of initializations
+ //
+ Resource::Manager& m_resourceManager;
+ PeerFinder::Manager& m_peerFinder;
+ OverlayImpl& m_overlay;
+ bool m_inbound;
+
+ std::unique_ptr m_socket;
+ boost::asio::io_service::strand m_strand;
+
+ State m_state; // Current state
+ bool m_detaching; // True if detaching.
+ bool m_clusterNode; // True if peer is a node in our cluster
+ RippleAddress m_nodePublicKey; // Node public key of peer.
+ std::string m_nodeName;
+
+ // Both sides of the peer calculate this value and verify that it matches
+ // to detect/prevent man-in-the-middle attacks.
+ //
+ uint256 m_secureCookie;
+
+ // The indices of the smallest and largest ledgers this peer has available
+ //
+ LedgerIndex m_minLedger;
+ LedgerIndex m_maxLedger;
+
+ uint256 m_closedLedgerHash;
+ uint256 m_previousLedgerHash;
+
+ std::list m_recentLedgers;
+ std::list