BitShares-Core  6.1.0
BitShares blockchain implementation and command-line interface software
peer_connection.hpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 Cryptonomex, Inc., and contributors.
3  *
4  * The MIT License
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  */
24 #pragma once
25 
26 #include <graphene/net/node.hpp>
29 #include <graphene/net/config.hpp>
30 
31 #include <boost/tuple/tuple.hpp>
32 
33 #include <boost/multi_index_container.hpp>
34 #include <boost/multi_index/ordered_index.hpp>
35 #include <boost/multi_index/mem_fun.hpp>
36 #include <boost/multi_index/member.hpp>
37 #include <boost/multi_index/tag.hpp>
38 #include <boost/multi_index/hashed_index.hpp>
39 
40 #include <queue>
41 #include <boost/container/deque.hpp>
42 #include <fc/thread/future.hpp>
43 
44 namespace graphene { namespace net
45  {
46  class peer_connection;
48  {
49  public:
50  virtual ~peer_connection_delegate() = default;
51  virtual void on_message(peer_connection* originating_peer,
52  const message& received_message) = 0;
53  virtual void on_connection_closed(peer_connection* originating_peer) = 0;
54  virtual message get_message_for_item(const item_id& item) = 0;
55  };
56 
57  using peer_connection_ptr = std::shared_ptr<peer_connection>;
59  public std::enable_shared_from_this<peer_connection>
60  {
61  public:
63  {
64  disconnected,
65  just_connected,
66  connection_accepted,
67  connection_rejected
70  };
72  {
73  disconnected,
74  just_connected,
75  connection_accepted,
76  connection_rejected
77  };
79  {
80  disconnected,
81  connecting,
82  connected,
83  accepting,
84  accepted,
85  hello_sent,
86  peer_connection_accepted,
87  peer_connection_rejected,
88  negotiation_complete,
89  closing,
90  closed
91  };
92  private:
94  fc::optional<fc::ip::endpoint> _remote_endpoint;
95  message_oriented_connection _message_connection;
96 
97  /* a base class for messages on the queue, to hide the fact that some
98  * messages are complete messages and some are only hashes of messages.
99  */
100  struct queued_message
101  {
102  fc::time_point enqueue_time;
103  fc::time_point transmission_start_time;
104  fc::time_point transmission_finish_time;
105 
106  explicit queued_message(fc::time_point enqueue_time = fc::time_point::now()) :
107  enqueue_time(enqueue_time)
108  {}
109 
110  virtual message get_message(peer_connection_delegate* node) = 0;
114  virtual size_t get_size_in_queue() = 0;
115  virtual ~queued_message() = default;
116  };
117 
118  /* when you queue up a 'real_queued_message', a full copy of the message is
119  * stored on the heap until it is sent
120  */
121  struct real_queued_message : queued_message
122  {
123  message message_to_send;
124  size_t message_send_time_field_offset;
125 
126  real_queued_message(message message_to_send,
127  size_t message_send_time_field_offset = (size_t)-1) :
128  message_to_send(std::move(message_to_send)),
129  message_send_time_field_offset(message_send_time_field_offset)
130  {}
131 
132  message get_message(peer_connection_delegate* node) override;
133  size_t get_size_in_queue() override;
134  };
135 
136  /* when you queue up a 'virtual_queued_message', we just queue up the hash of the
137  * item we want to send. When it reaches the top of the queue, we make a callback
138  * to the node to generate the message.
139  */
140  struct virtual_queued_message : queued_message
141  {
142  item_id item_to_send;
143 
144  explicit virtual_queued_message(item_id the_item_to_send) :
145  item_to_send(std::move(the_item_to_send))
146  {}
147 
148  message get_message(peer_connection_delegate* node) override;
149  size_t get_size_in_queue() override;
150  };
151 
152 
153  size_t _total_queued_messages_size = 0;
154  std::queue<std::unique_ptr<queued_message>, std::list<std::unique_ptr<queued_message> > > _queued_messages;
155  fc::future<void> _send_queued_messages_done;
156  public:
164 
165  our_connection_state our_state = our_connection_state::disconnected;
166  bool they_have_requested_close = false;
167  their_connection_state their_state = their_connection_state::disconnected;
168  bool we_have_requested_close = false;
169 
170  connection_negotiation_status negotiation_status = connection_negotiation_status::disconnected;
172 
173  fc::time_point get_connection_time()const { return _message_connection.get_connection_time(); }
174  fc::time_point get_connection_terminated_time()const { return connection_terminated_time; }
175 
178 
185  uint32_t core_protocol_version = 0;
186  std::string user_agent;
193 
194  // Initially, these fields record info about our local socket,
195  // they are useless (except the remote_inbound_endpoint field for outbound connections).
196  // After we receive a hello message, they are replaced with the info in the hello message.
198  uint16_t inbound_port = 0;
199  uint16_t outbound_port = 0;
203  fc::flat_set<fc::ip::endpoint> additional_inbound_endpoints;
205  fc::flat_map<fc::ip::endpoint, firewalled_state> potential_inbound_endpoints;
207 
208  using item_to_time_map_type = std::unordered_map<item_id, fc::time_point>;
209 
213  boost::container::deque<item_hash_t> ids_of_items_to_get;
216  std::set<item_hash_t> ids_of_items_being_processed;
218  uint32_t number_of_unfetched_item_ids = 0;
219  bool peer_needs_sync_items_from_us = false;
220  bool we_need_sync_items_from_peer = false;
227  std::set<item_hash_t> sync_items_requested_from_peer;
231  bool inhibit_fetching_sync_blocks = false;
233 
237  {
240  timestamped_item_id(const item_id& item, const fc::time_point_sec timestamp) :
241  item(item),
242  timestamp(timestamp)
243  {}
244  };
245  struct timestamp_index{};
246  using timestamped_items_set_type = boost::multi_index_container< timestamped_item_id,
247  boost::multi_index::indexed_by<
248  boost::multi_index::hashed_unique<
249  boost::multi_index::member<timestamped_item_id, item_id, &timestamped_item_id::item>,
250  std::hash<item_id>
251  >,
252  boost::multi_index::ordered_non_unique<
253  boost::multi_index::tag<timestamp_index>,
254  boost::multi_index::member<timestamped_item_id, fc::time_point_sec, &timestamped_item_id::timestamp>
255  >
256  >
257  >;
260 
265 
266  // if they're flooding us with transactions, we set this to avoid fetching for a few seconds to let the
267  // blockchain catch up
269 
270  uint32_t last_known_fork_block_number = 0;
271 
273 
275  bool expecting_address_message = false;
276 
277  private:
278 #ifndef NDEBUG
279  fc::thread* _thread = nullptr;
280  unsigned _send_message_queue_tasks_running = 0; // temporary debugging
281 #endif
282  bool _currently_handling_message = false;
284  protected:
286  private:
287  void destroy();
288  public:
290  static peer_connection_ptr make_shared(peer_connection_delegate* delegate);
291  virtual ~peer_connection();
292 
293  fc::tcp_socket& get_socket();
294  void accept_connection();
295  void connect_to(const fc::ip::endpoint& remote_endpoint,
297 
298  void on_message(message_oriented_connection* originating_connection, const message& received_message) override;
299  void on_connection_closed(message_oriented_connection* originating_connection) override;
300 
301  void send_queueable_message(std::unique_ptr<queued_message>&& message_to_send);
302  virtual void send_message( const message& message_to_send, size_t message_send_time_field_offset = (size_t)-1 );
303  void send_item(const item_id& item_to_send);
304  void close_connection();
305  void destroy_connection();
306 
307  uint64_t get_total_bytes_sent() const;
308  uint64_t get_total_bytes_received() const;
309 
310  fc::time_point get_last_message_sent_time() const;
311  fc::time_point get_last_message_received_time() const;
312 
313  fc::optional<fc::ip::endpoint> get_remote_endpoint();
314  fc::ip::endpoint get_local_endpoint();
315  void set_remote_endpoint(fc::optional<fc::ip::endpoint> new_remote_endpoint);
316 
317  bool busy() const;
318  bool idle() const;
319  bool is_currently_handling_message() const;
320 
321  bool is_transaction_fetching_inhibited() const;
322  fc::sha512 get_shared_secret() const;
323  void clear_old_inventory();
324  bool is_inventory_advertised_to_us_list_full_for_transactions() const;
325  bool is_inventory_advertised_to_us_list_full() const;
326  fc::optional<fc::ip::endpoint> get_endpoint_for_connecting() const;
327  private:
328  void send_queued_messages_task();
329  void accept_connection_task();
330  void connect_to_task(const fc::ip::endpoint& remote_endpoint);
331  };
332  typedef std::shared_ptr<peer_connection> peer_connection_ptr;
333 
334  } } // end namespace graphene::net
335 
336 // not sent over the wire, just reflected for logging
338  (just_connected)
339  (connection_accepted)
340  (connection_rejected))
342  (just_connected)
343  (connection_accepted)
344  (connection_rejected))
346  (connecting)
347  (connected)
348  (accepting)
349  (accepted)
350  (hello_sent)
351  (peer_connection_accepted)
352  (peer_connection_rejected)
353  (negotiation_complete)
354  (closing)
355  (closed) )
356 
item_to_time_map_type items_requested_from_peer
fc::future< void > accept_or_connect_task_done
item_hash_t last_block_delegate_has_seen
The hash of the last block this peer has told us about that the peer knows.
timestamped_items_set_type inventory_peer_advertised_to_us
std::unordered_map< item_id, fc::time_point > item_to_time_map_type
const uint32_t core_protocol_version
#define FC_REFLECT(TYPE, MEMBERS)
Specializes fc::reflector for TYPE.
Definition: reflect.hpp:388
Definition: api.cpp:48
fc::time_point get_connection_terminated_time() const
std::set< item_hash_t > sync_items_requested_from_peer
IDs of blocks we&#39;ve requested from this peer during sync. Fetch from another peer if this peer discon...
timestamped_items_set_type inventory_advertised_to_peer
FC_REFLECT_ENUM(graphene::net::core_message_type_enum,(trx_message_type)(block_message_type)(core_message_type_first)(item_ids_inventory_message_type)(blockchain_item_ids_inventory_message_type)(fetch_blockchain_item_ids_message_type)(fetch_items_message_type)(item_not_available_message_type)(hello_message_type)(connection_accepted_message_type)(connection_rejected_message_type)(address_request_message_type)(address_message_type)(closing_connection_message_type)(current_time_request_message_type)(current_time_reply_message_type)(check_firewall_message_type)(check_firewall_reply_message_type)(get_current_connections_request_message_type)(get_current_connections_reply_message_type)(core_message_type_last))(different_chain)(already_connected)(connected_to_self)(not_accepting_connections)(blocked)(invalid_hello_message)(client_too_old))(inbound)(outbound))(firewalled)(not_firewalled))(unable_to_connect)(connection_successful)) namespace std
provides application independent P2P broadcast and data synchronization
Definition: node.hpp:190
fc::time_point transaction_fetching_inhibited_until
fc::time_point get_connection_time() const
timestamped_item_id(const item_id &item, const fc::time_point_sec timestamp)
fc::flat_map< fc::ip::endpoint, firewalled_state > potential_inbound_endpoints
Potential inbound endpoints of the peer.
fc::optional< std::string > platform
fc::optional< std::string > fc_git_revision_sha
std::shared_ptr< peer_connection > peer_connection_ptr
fc::optional< uint32_t > bitness
fc::flat_set< fc::ip::endpoint > additional_inbound_endpoints
Some nodes may be listening on multiple endpoints.
fc::optional< boost::tuple< std::vector< item_hash_t >, fc::time_point > > item_ids_requested_from_peer
We check this to detect a timed-out request and in busy()
fc::optional< std::string > graphene_git_revision_sha
fc::optional< fc::ip::endpoint > remote_inbound_endpoint
The inbound endpoint of the remote peer (our best guess)
boost::multi_index_container< timestamped_item_id, boost::multi_index::indexed_by< boost::multi_index::hashed_unique< boost::multi_index::member< timestamped_item_id, item_id, &timestamped_item_id::item >, std::hash< item_id > >, boost::multi_index::ordered_non_unique< boost::multi_index::tag< timestamp_index >, boost::multi_index::member< timestamped_item_id, fc::time_point_sec, &timestamped_item_id::timestamp > > > > timestamped_items_set_type
virtual void on_message(peer_connection *originating_peer, const message &received_message)=0
fc::optional< fc::time_point_sec > fc_git_revision_unix_timestamp
fc::optional< fc::time_point_sec > graphene_git_revision_unix_timestamp
std::set< item_hash_t > ids_of_items_being_processed
static time_point now()
Definition: time.cpp:13
boost::container::deque< item_hash_t > ids_of_items_to_get
virtual message get_message_for_item(const item_id &item)=0
fc::time_point_sec last_block_time_delegate_has_seen
virtual void on_connection_closed(peer_connection *originating_peer)=0