BitShares-Core  5.0.0
BitShares blockchain implementation and command-line interface software
peer_connection.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 Cryptonomex, Inc., and contributors.
3  *
4  * The MIT License
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  */
26 #include <graphene/net/config.hpp>
28 
29 #include <fc/io/raw.hpp>
30 #include <fc/thread/thread.hpp>
31 
32 #include <boost/scope_exit.hpp>
33 
34 #ifdef DEFAULT_LOGGER
35 # undef DEFAULT_LOGGER
36 #endif
37 #define DEFAULT_LOGGER "p2p"
38 
39 #ifndef NDEBUG
40 # define VERIFY_CORRECT_THREAD() assert(_thread->is_current())
41 #else
42 # define VERIFY_CORRECT_THREAD() do {} while (0)
43 #endif
44 
45 namespace graphene { namespace net
46  {
47  message peer_connection::real_queued_message::get_message(peer_connection_delegate*)
48  {
49  if (message_send_time_field_offset != (size_t)-1)
50  {
51  // patch the current time into the message. Since this operates on the packed version of the structure,
52  // it won't work for anything after a variable-length field
53  std::vector<char> packed_current_time = fc::raw::pack(fc::time_point::now());
54  assert(message_send_time_field_offset + packed_current_time.size() <= message_to_send.data.size());
55  memcpy(message_to_send.data.data() + message_send_time_field_offset,
56  packed_current_time.data(), packed_current_time.size());
57  }
58  return message_to_send;
59  }
60  size_t peer_connection::real_queued_message::get_size_in_queue()
61  {
62  return message_to_send.data.size();
63  }
64  message peer_connection::virtual_queued_message::get_message(peer_connection_delegate* node)
65  {
66  return node->get_message_for_item(item_to_send);
67  }
68 
69  size_t peer_connection::virtual_queued_message::get_size_in_queue()
70  {
71  return sizeof(item_id);
72  }
73 
74  peer_connection::peer_connection(peer_connection_delegate* delegate) :
75  _node(delegate),
76  _message_connection(this),
77  _total_queued_messages_size(0),
79  is_firewalled(firewalled_state::unknown),
80  our_state(our_connection_state::disconnected),
81  they_have_requested_close(false),
82  their_state(their_connection_state::disconnected),
83  we_have_requested_close(false),
84  negotiation_status(connection_negotiation_status::disconnected),
85  number_of_unfetched_item_ids(0),
86  peer_needs_sync_items_from_us(true),
87  we_need_sync_items_from_peer(true),
88  inhibit_fetching_sync_blocks(false),
89  transaction_fetching_inhibited_until(fc::time_point::min()),
90  last_known_fork_block_number(0),
91  firewall_check_state(nullptr),
92 #ifndef NDEBUG
93  _thread(&fc::thread::current()),
94  _send_message_queue_tasks_running(0),
95 #endif
96  _currently_handling_message(false)
97  {
98  }
99 
101  {
102  // The lifetime of peer_connection objects is managed by shared_ptrs in node. The peer_connection
103  // is responsible for notifying the node when it should be deleted, and the process of deleting it
104  // cleans up the peer connection's asynchronous tasks which are responsible for notifying the node
105  // when it should be deleted.
106  // To ease this vicious cycle, we slightly delay the execution of the destructor until the
107  // current task yields. In the (not uncommon) case where it is the task executing
108  // connect_to or read_loop, this allows the task to finish before the destructor is forced
109  // to cancel it.
110  return peer_connection_ptr(new peer_connection(delegate));
111  //, [](peer_connection* peer_to_delete){ fc::async([peer_to_delete](){delete peer_to_delete;}); });
112  }
113 
114  void peer_connection::destroy()
115  {
117 
118 #if 0 // this gets too verbose
119 #ifndef NDEBUG
120  struct scope_logger {
122  scope_logger(const fc::optional<fc::ip::endpoint>& endpoint) : endpoint(endpoint) { dlog("entering peer_connection::destroy() for peer ${endpoint}", ("endpoint", endpoint)); }
123  ~scope_logger() { dlog("leaving peer_connection::destroy() for peer ${endpoint}", ("endpoint", endpoint)); }
124  } send_message_scope_logger(get_remote_endpoint());
125 #endif
126 #endif
127 
128  try
129  {
130  dlog("calling close_connection()");
132  dlog("close_connection completed normally");
133  }
134  catch ( const fc::canceled_exception& )
135  {
136  assert(false && "the task that deletes peers should not be canceled because it will prevent us from cleaning up correctly");
137  }
138  catch ( ... )
139  {
140  dlog("close_connection threw");
141  }
142 
143  try
144  {
145  dlog("canceling _send_queued_messages task");
146  _send_queued_messages_done.cancel_and_wait(__FUNCTION__);
147  dlog("cancel_and_wait completed normally");
148  }
149  catch( const fc::exception& e )
150  {
151  wlog("Unexpected exception from peer_connection's send_queued_messages_task : ${e}", ("e", e));
152  }
153  catch( ... )
154  {
155  wlog("Unexpected exception from peer_connection's send_queued_messages_task");
156  }
157 
158  try
159  {
160  dlog("canceling accept_or_connect_task");
162  dlog("accept_or_connect_task completed normally");
163  }
164  catch( const fc::exception& e )
165  {
166  wlog("Unexpected exception from peer_connection's accept_or_connect_task : ${e}", ("e", e));
167  }
168  catch( ... )
169  {
170  wlog("Unexpected exception from peer_connection's accept_or_connect_task");
171  }
172 
173  _message_connection.destroy_connection(); // shut down the read loop
174  }
175 
177  {
179  destroy();
180  }
181 
183  {
185  return _message_connection.get_socket();
186  }
187 
189  {
191 
192  struct scope_logger {
193  scope_logger() { dlog("entering peer_connection::accept_connection()"); }
194  ~scope_logger() { dlog("leaving peer_connection::accept_connection()"); }
195  } accept_connection_scope_logger;
196 
197  try
198  {
203  _message_connection.accept(); // perform key exchange
205  _remote_endpoint = _message_connection.get_socket().remote_endpoint();
206 
207  // firewall-detecting info is pretty useless for inbound connections, but initialize
208  // it the best we can
209  fc::ip::endpoint local_endpoint = _message_connection.get_socket().local_endpoint();
210  inbound_address = local_endpoint.get_address();
211  inbound_port = local_endpoint.port();
213 
216  ilog( "established inbound connection from ${remote_endpoint}, sending hello", ("remote_endpoint", _message_connection.get_socket().remote_endpoint() ) );
217  }
218  catch ( const fc::exception& e )
219  {
220  wlog( "error accepting connection ${e}", ("e", e.to_detail_string() ) );
221  throw;
222  }
223  }
224 
225  void peer_connection::connect_to( const fc::ip::endpoint& remote_endpoint, fc::optional<fc::ip::endpoint> local_endpoint )
226  {
228  try
229  {
233 
234  _remote_endpoint = remote_endpoint;
235  if( local_endpoint )
236  {
237  // the caller wants us to bind the local side of this socket to a specific ip/port
238  // This depends on the ip/port being unused, and on being able to set the
239  // SO_REUSEADDR/SO_REUSEPORT flags, and either of these might fail, so we need to
240  // detect if this fails.
241  try
242  {
243  _message_connection.bind( *local_endpoint );
244  }
245  catch ( const fc::canceled_exception& )
246  {
247  throw;
248  }
249  catch ( const fc::exception& except )
250  {
251  wlog( "Failed to bind to desired local endpoint ${endpoint}, will connect using an OS-selected endpoint: ${except}", ("endpoint", *local_endpoint )("except", except ) );
252  }
253  }
255  _message_connection.connect_to( remote_endpoint );
259  ilog( "established outbound connection to ${remote_endpoint}", ("remote_endpoint", remote_endpoint ) );
260  }
261  catch ( fc::exception& e )
262  {
263  wlog( "error connecting to peer ${remote_endpoint}: ${e}", ("remote_endpoint", remote_endpoint )("e", e.to_detail_string() ) );
264  throw;
265  }
266  } // connect_to()
267 
268  void peer_connection::on_message( message_oriented_connection* originating_connection, const message& received_message )
269  {
271  _currently_handling_message = true;
272  BOOST_SCOPE_EXIT(this_) {
273  this_->_currently_handling_message = false;
274  } BOOST_SCOPE_EXIT_END
275  _node->on_message( this, received_message );
276  }
277 
279  {
282  _node->on_connection_closed( this );
283  }
284 
285  void peer_connection::send_queued_messages_task()
286  {
288 #ifndef NDEBUG
289  struct counter {
290  unsigned& _send_message_queue_tasks_counter;
291  counter(unsigned& var) : _send_message_queue_tasks_counter(var) { /* dlog("entering peer_connection::send_queued_messages_task()"); */ assert(_send_message_queue_tasks_counter == 0); ++_send_message_queue_tasks_counter; }
292  ~counter() { assert(_send_message_queue_tasks_counter == 1); --_send_message_queue_tasks_counter; /* dlog("leaving peer_connection::send_queued_messages_task()"); */ }
293  } concurrent_invocation_counter(_send_message_queue_tasks_running);
294 #endif
295  while (!_queued_messages.empty())
296  {
297  _queued_messages.front()->transmission_start_time = fc::time_point::now();
298  message message_to_send = _queued_messages.front()->get_message(_node);
299  try
300  {
301  //dlog("peer_connection::send_queued_messages_task() calling message_oriented_connection::send_message() "
302  // "to send message of type ${type} for peer ${endpoint}",
303  // ("type", message_to_send.msg_type)("endpoint", get_remote_endpoint()));
304  _message_connection.send_message(message_to_send);
305  //dlog("peer_connection::send_queued_messages_task()'s call to message_oriented_connection::send_message() completed normally for peer ${endpoint}",
306  // ("endpoint", get_remote_endpoint()));
307  }
308  catch (const fc::canceled_exception&)
309  {
310  dlog("message_oriented_connection::send_message() was canceled, rethrowing canceled_exception");
311  throw;
312  }
313  catch (const fc::exception& send_error)
314  {
315  wlog("Error sending message: ${exception}. Closing connection.", ("exception", send_error));
316  try
317  {
319  }
320  catch (const fc::exception& close_error)
321  {
322  wlog("Caught error while closing connection: ${exception}", ("exception", close_error));
323  }
324  return;
325  }
326  catch (const std::exception& e)
327  {
328  wlog("message_oriented_exception::send_message() threw a std::exception(): ${what}", ("what", e.what()));
329  }
330  catch (...)
331  {
332  wlog("message_oriented_exception::send_message() threw an unhandled exception");
333  }
334  _queued_messages.front()->transmission_finish_time = fc::time_point::now();
335  _total_queued_messages_size -= _queued_messages.front()->get_size_in_queue();
336  _queued_messages.pop();
337  }
338  //dlog("leaving peer_connection::send_queued_messages_task() due to queue exhaustion");
339  }
340 
341  void peer_connection::send_queueable_message(std::unique_ptr<queued_message>&& message_to_send)
342  {
344  _total_queued_messages_size += message_to_send->get_size_in_queue();
345  _queued_messages.emplace(std::move(message_to_send));
346  if (_total_queued_messages_size > GRAPHENE_NET_MAXIMUM_QUEUED_MESSAGES_IN_BYTES)
347  {
348  wlog("send queue exceeded maximum size of ${max} bytes (current size ${current} bytes)",
349  ("max", GRAPHENE_NET_MAXIMUM_QUEUED_MESSAGES_IN_BYTES)("current", _total_queued_messages_size));
350  try
351  {
353  }
354  catch (const fc::exception& e)
355  {
356  wlog("Caught error while closing connection: ${exception}", ("exception", e));
357  }
358  return;
359  }
360 
361  if( _send_queued_messages_done.valid() && _send_queued_messages_done.canceled() )
362  FC_THROW_EXCEPTION(fc::exception, "Attempting to send a message on a connection that is being shut down");
363 
364  if (!_send_queued_messages_done.valid() || _send_queued_messages_done.ready())
365  {
366  //dlog("peer_connection::send_message() is firing up send_queued_message_task");
367  _send_queued_messages_done = fc::async([this](){ send_queued_messages_task(); }, "send_queued_messages_task");
368  }
369  //else
370  // dlog("peer_connection::send_message() doesn't need to fire up send_queued_message_task, it's already running");
371  }
372 
373  void peer_connection::send_message(const message& message_to_send, size_t message_send_time_field_offset)
374  {
376  //dlog("peer_connection::send_message() enqueueing message of type ${type} for peer ${endpoint}",
377  // ("type", message_to_send.msg_type)("endpoint", get_remote_endpoint()));
378  std::unique_ptr<queued_message> message_to_enqueue(new real_queued_message(message_to_send, message_send_time_field_offset));
379  send_queueable_message(std::move(message_to_enqueue));
380  }
381 
382  void peer_connection::send_item(const item_id& item_to_send)
383  {
385  //dlog("peer_connection::send_item() enqueueing message of type ${type} for peer ${endpoint}",
386  // ("type", item_to_send.item_type)("endpoint", get_remote_endpoint()));
387  std::unique_ptr<queued_message> message_to_enqueue(new virtual_queued_message(item_to_send));
388  send_queueable_message(std::move(message_to_enqueue));
389  }
390 
392  {
397  _message_connection.close_connection();
398  }
399 
401  {
404  destroy();
405  }
406 
408  {
410  return _message_connection.get_total_bytes_sent();
411  }
412 
414  {
416  return _message_connection.get_total_bytes_received();
417  }
418 
420  {
422  return _message_connection.get_last_message_sent_time();
423  }
424 
426  {
428  return _message_connection.get_last_message_received_time();
429  }
430 
432  {
434  return _remote_endpoint;
435  }
437  {
439  return _message_connection.get_socket().local_endpoint();
440  }
441 
443  {
445  _remote_endpoint = new_remote_endpoint;
446  }
447 
449  {
452  }
453 
455  {
457  return !busy();
458  }
459 
461  {
463  return _currently_handling_message;
464  }
465 
467  {
470  }
471 
473  {
475  return _message_connection.get_shared_secret();
476  }
477 
479  {
482 
483  // expire old items from inventory_advertised_to_peer
484  auto oldest_inventory_to_keep_iter = inventory_advertised_to_peer.get<timestamp_index>().lower_bound(oldest_inventory_to_keep);
485  auto begin_iter = inventory_advertised_to_peer.get<timestamp_index>().begin();
486  unsigned number_of_elements_advertised_to_peer_to_discard = std::distance(begin_iter, oldest_inventory_to_keep_iter);
487  inventory_advertised_to_peer.get<timestamp_index>().erase(begin_iter, oldest_inventory_to_keep_iter);
488 
489  // also expire items from inventory_peer_advertised_to_us
490  oldest_inventory_to_keep_iter = inventory_peer_advertised_to_us.get<timestamp_index>().lower_bound(oldest_inventory_to_keep);
491  begin_iter = inventory_peer_advertised_to_us.get<timestamp_index>().begin();
492  unsigned number_of_elements_peer_advertised_to_discard = std::distance(begin_iter, oldest_inventory_to_keep_iter);
493  inventory_peer_advertised_to_us.get<timestamp_index>().erase(begin_iter, oldest_inventory_to_keep_iter);
494  dlog("Expiring old inventory for peer ${peer}: removing ${to_peer} items advertised to peer (${remain_to_peer} left), and ${to_us} advertised to us (${remain_to_us} left)",
495  ("peer", get_remote_endpoint())
496  ("to_peer", number_of_elements_advertised_to_peer_to_discard)("remain_to_peer", inventory_advertised_to_peer.size())
497  ("to_us", number_of_elements_peer_advertised_to_discard)("remain_to_us", inventory_peer_advertised_to_us.size()));
498  }
499 
500  // we have a higher limit for blocks than transactions so we will still fetch blocks even when transactions are throttled
502  {
505  }
506 
508  {
510  // allow the total inventory size to be the maximum number of transactions we'll store in the inventory (above)
511  // plus the maximum number of blocks that would be generated in GRAPHENE_NET_MAX_INVENTORY_SIZE_IN_MINUTES (plus one,
512  // to give us some wiggle room)
513  return inventory_peer_advertised_to_us.size() >
516  }
517 
519  {
521  }
522 
524  {
525  if (inbound_port)
528  }
529 
530 } } // end namespace graphene::net
uint64_t get_total_bytes_received() const
item_to_time_map_type items_requested_from_peer
fc::future< void > accept_or_connect_task_done
peer_connection_direction direction
timestamped_items_set_type inventory_peer_advertised_to_us
auto async(Functor &&f, const char *desc FC_TASK_NAME_DEFAULT_ARG, priority prio=priority()) -> fc::future< decltype(f())>
Definition: thread.hpp:227
microseconds minutes(int64_t m)
Definition: time.hpp:36
connection_negotiation_status negotiation_status
void pack(Stream &s, const flat_set< T, A... > &value, uint32_t _max_depth)
Definition: flat.hpp:11
static peer_connection_ptr make_shared(peer_connection_delegate *delegate)
void on_connection_closed(message_oriented_connection *originating_connection) override
#define VERIFY_CORRECT_THREAD()
Definition: api.cpp:56
firewall_check_state_data * firewall_check_state
Used to generate a useful error report when an exception is thrown.At each level in the stack where t...
Definition: exception.hpp:56
std::string to_detail_string(log_level ll=log_level::all) const
Definition: exception.cpp:183
std::set< item_hash_t > sync_items_requested_from_peer
the time we received the last sync item or the time we sent the last batch of sync item requests to t...
void connect_to(const fc::ip::endpoint &remote_endpoint, fc::optional< fc::ip::endpoint > local_endpoint=fc::optional< fc::ip::endpoint >())
bool is_transaction_fetching_inhibited() const
#define wlog(FORMAT,...)
Definition: logger.hpp:123
timestamped_items_set_type inventory_advertised_to_peer
bool is_inventory_advertised_to_us_list_full_for_transactions() const
const address & get_address() const
Definition: ip.cpp:72
void on_message(message_oriented_connection *originating_connection, const message &received_message) override
void set_remote_endpoint(fc::optional< fc::ip::endpoint > new_remote_endpoint)
void send_queueable_message(std::unique_ptr< queued_message > &&message_to_send)
fc::time_point get_last_message_received_time() const
void send_message(const message &message_to_send, size_t message_send_time_field_offset=(size_t)-1)
fc::optional< fc::ip::endpoint > get_remote_endpoint()
fc::optional< fc::ip::endpoint > get_endpoint_for_connecting() const
static time_point min()
Definition: time.hpp:49
static thread & current()
Definition: thread.cpp:125
uint16_t port() const
Definition: ip.cpp:71
void cancel_and_wait(const char *reason FC_CANCELATION_REASON_DEFAULT_ARG)
Definition: future.hpp:314
#define GRAPHENE_MIN_BLOCK_INTERVAL
Definition: config.hpp:59
fc::time_point transaction_fetching_inhibited_until
#define ilog(FORMAT,...)
Definition: logger.hpp:117
fc::ecc::public_key_data node_id_t
boost::asio::ip::tcp::endpoint endpoint
Definition: asio.hpp:240
fc::ip::endpoint get_local_endpoint()
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
Definition: exception.hpp:378
bool is_inventory_advertised_to_us_list_full() const
#define GRAPHENE_NET_MAX_INVENTORY_SIZE_IN_MINUTES
Definition: config.hpp:83
std::shared_ptr< peer_connection > peer_connection_ptr
#define dlog(FORMAT,...)
Definition: logger.hpp:100
fc::optional< boost::tuple< std::vector< item_hash_t >, fc::time_point > > item_ids_requested_from_peer
fc::time_point get_last_message_sent_time() const
uint64_t get_total_bytes_sent() const
static time_point now()
Definition: time.cpp:13
#define GRAPHENE_NET_MAXIMUM_QUEUED_MESSAGES_IN_BYTES
Definition: config.hpp:61
void send_item(const item_id &item_to_send)
fc::sha512 get_shared_secret() const
#define GRAPHENE_NET_MAX_TRX_PER_SECOND
Definition: config.hpp:108
their_connection_state their_state