BitShares-Core  5.0.0
BitShares blockchain implementation and command-line interface software
node.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 Cryptonomex, Inc., and contributors.
3  *
4  * The MIT License
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  */
24 #include <sstream>
25 #include <iomanip>
26 #include <deque>
27 #include <unordered_set>
28 #include <list>
29 #include <forward_list>
30 #include <iostream>
31 #include <algorithm>
32 #include <tuple>
33 #include <string>
34 #include <boost/tuple/tuple.hpp>
35 #include <boost/circular_buffer.hpp>
36 
37 #include <boost/multi_index_container.hpp>
38 #include <boost/multi_index/ordered_index.hpp>
39 #include <boost/multi_index/mem_fun.hpp>
40 #include <boost/multi_index/member.hpp>
41 #include <boost/multi_index/random_access_index.hpp>
42 #include <boost/multi_index/tag.hpp>
43 #include <boost/multi_index/sequenced_index.hpp>
44 #include <boost/multi_index/hashed_index.hpp>
45 #include <boost/logic/tribool.hpp>
46 #include <boost/range/algorithm_ext/push_back.hpp>
47 #include <boost/range/algorithm/find.hpp>
48 #include <boost/range/numeric.hpp>
49 
50 #include <boost/accumulators/accumulators.hpp>
51 #include <boost/accumulators/statistics/stats.hpp>
52 #include <boost/accumulators/statistics/rolling_mean.hpp>
53 #include <boost/accumulators/statistics/min.hpp>
54 #include <boost/accumulators/statistics/max.hpp>
55 #include <boost/accumulators/statistics/sum.hpp>
56 #include <boost/accumulators/statistics/count.hpp>
57 
58 #include <boost/preprocessor/seq/for_each.hpp>
59 #include <boost/preprocessor/cat.hpp>
60 #include <boost/preprocessor/stringize.hpp>
61 
62 #include <fc/thread/thread.hpp>
63 #include <fc/thread/future.hpp>
65 #include <fc/thread/mutex.hpp>
67 #include <fc/log/logger.hpp>
68 #include <fc/io/json.hpp>
69 #include <fc/io/enum_type.hpp>
70 #include <fc/io/raw.hpp>
71 #include <fc/crypto/rand.hpp>
73 #include <fc/network/ip.hpp>
74 #include <fc/network/resolve.hpp>
75 
76 #include <graphene/net/node.hpp>
80 #include <graphene/net/config.hpp>
82 
85 // Nasty hack: A circular dependency around fee_schedule is resolved by fwd-declaring it and using a shared_ptr
86 // to it in chain_parameters, which is used in an operation and thus must be serialized by the net library.
87 // Resolving that forward declaration doesn't happen until now:
89 
90 #include <fc/git_revision.hpp>
91 
92 //#define ENABLE_DEBUG_ULOGS
93 
94 #ifdef DEFAULT_LOGGER
95 # undef DEFAULT_LOGGER
96 #endif
97 #define DEFAULT_LOGGER "p2p"
98 
99 #define P2P_IN_DEDICATED_THREAD 1
100 
101 #define INVOCATION_COUNTER(name) \
102  static unsigned total_ ## name ## _counter = 0; \
103  static unsigned active_ ## name ## _counter = 0; \
104  struct name ## _invocation_logger { \
105  unsigned *total; \
106  unsigned *active; \
107  name ## _invocation_logger(unsigned *total, unsigned *active) : \
108  total(total), active(active) \
109  { \
110  ++*total; \
111  ++*active; \
112  dlog("NEWDEBUG: Entering " #name ", now ${total} total calls, ${active} active calls", ("total", *total)("active", *active)); \
113  } \
114  ~name ## _invocation_logger() \
115  { \
116  --*active; \
117  dlog("NEWDEBUG: Leaving " #name ", now ${total} total calls, ${active} active calls", ("total", *total)("active", *active)); \
118  } \
119  } invocation_logger(&total_ ## name ## _counter, &active_ ## name ## _counter)
120 
121 //log these messages even at warn level when operating on the test network
122 #ifdef GRAPHENE_TEST_NETWORK
123 #define testnetlog wlog
124 #else
125 #define testnetlog(...) do {} while (0)
126 #endif
127 
128 namespace graphene { namespace net {
129 
130  namespace detail
131  {
132  namespace bmi = boost::multi_index;
134  {
135  private:
136  static const uint32_t cache_duration_in_blocks = GRAPHENE_NET_MESSAGE_CACHE_DURATION_IN_BLOCKS;
137 
138  struct message_hash_index{};
139  struct message_contents_hash_index{};
140  struct block_clock_index{};
141  struct message_info
142  {
143  message_hash_type message_hash;
144  message message_body;
145  uint32_t block_clock_when_received;
146 
147  // for network performance stats
148  message_propagation_data propagation_data;
149  fc::uint160_t message_contents_hash; // hash of whatever the message contains (if it's a transaction, this is the transaction id, if it's a block, it's the block_id)
150 
151  message_info( const message_hash_type& message_hash,
152  const message& message_body,
153  uint32_t block_clock_when_received,
154  const message_propagation_data& propagation_data,
155  fc::uint160_t message_contents_hash ) :
156  message_hash( message_hash ),
157  message_body( message_body ),
158  block_clock_when_received( block_clock_when_received ),
159  propagation_data( propagation_data ),
160  message_contents_hash( message_contents_hash )
161  {}
162  };
163  typedef boost::multi_index_container
164  < message_info,
165  bmi::indexed_by< bmi::ordered_unique< bmi::tag<message_hash_index>,
166  bmi::member<message_info, message_hash_type, &message_info::message_hash> >,
167  bmi::ordered_non_unique< bmi::tag<message_contents_hash_index>,
168  bmi::member<message_info, fc::uint160_t, &message_info::message_contents_hash> >,
169  bmi::ordered_non_unique< bmi::tag<block_clock_index>,
170  bmi::member<message_info, uint32_t, &message_info::block_clock_when_received> > >
171  > message_cache_container;
172 
173  message_cache_container _message_cache;
174 
175  uint32_t block_clock;
176 
177  public:
179  block_clock( 0 )
180  {}
181  void block_accepted();
182  void cache_message( const message& message_to_cache, const message_hash_type& hash_of_message_to_cache,
183  const message_propagation_data& propagation_data, const fc::uint160_t& message_content_hash );
184  message get_message( const message_hash_type& hash_of_message_to_lookup );
185  message_propagation_data get_message_propagation_data( const fc::uint160_t& hash_of_message_contents_to_lookup ) const;
186  size_t size() const { return _message_cache.size(); }
187  };
188 
190  {
191  ++block_clock;
192  if( block_clock > cache_duration_in_blocks )
193  _message_cache.get<block_clock_index>().erase(_message_cache.get<block_clock_index>().begin(),
194  _message_cache.get<block_clock_index>().lower_bound(block_clock - cache_duration_in_blocks ) );
195  }
196 
198  const message_hash_type& hash_of_message_to_cache,
199  const message_propagation_data& propagation_data,
200  const fc::uint160_t& message_content_hash )
201  {
202  _message_cache.insert( message_info(hash_of_message_to_cache,
203  message_to_cache,
204  block_clock,
205  propagation_data,
206  message_content_hash ) );
207  }
208 
210  {
211  message_cache_container::index<message_hash_index>::type::const_iterator iter =
212  _message_cache.get<message_hash_index>().find(hash_of_message_to_lookup );
213  if( iter != _message_cache.get<message_hash_index>().end() )
214  return iter->message_body;
215  FC_THROW_EXCEPTION( fc::key_not_found_exception, "Requested message not in cache" );
216  }
217 
219  {
220  if( hash_of_message_contents_to_lookup != fc::uint160_t() )
221  {
222  message_cache_container::index<message_contents_hash_index>::type::const_iterator iter =
223  _message_cache.get<message_contents_hash_index>().find(hash_of_message_contents_to_lookup );
224  if( iter != _message_cache.get<message_contents_hash_index>().end() )
225  return iter->propagation_data;
226  }
227  FC_THROW_EXCEPTION( fc::key_not_found_exception, "Requested message not in cache" );
228  }
229 
231 
232  // This specifies configuration info for the local node. It's stored as JSON
233  // in the configuration directory (application data directory)
235  {
236  node_configuration() : accept_incoming_connections(true), wait_if_endpoint_is_busy(true) {}
237 
248  };
249 
250 
251 } } } // end namespace graphene::net::detail
253  (accept_incoming_connections)
255  (private_key));
256 
257 #include "node_impl.hxx"
258 
259 namespace graphene { namespace net { namespace detail {
260 
262  {
263 #ifdef P2P_IN_DEDICATED_THREAD
264  std::weak_ptr<fc::thread> weak_thread;
265  if (impl_to_delete)
266  {
267  std::shared_ptr<fc::thread> impl_thread(impl_to_delete->_thread);
268  weak_thread = impl_thread;
269  impl_thread->async([impl_to_delete](){ delete impl_to_delete; }, "delete node_impl").wait();
270  dlog("deleting the p2p thread");
271  }
272  if (weak_thread.expired())
273  dlog("done deleting the p2p thread");
274  else
275  dlog("failed to delete the p2p thread, we must be leaking a smart pointer somewhere");
276 #else // P2P_IN_DEDICATED_THREAD
277  delete impl_to_delete;
278 #endif // P2P_IN_DEDICATED_THREAD
279  }
280 
281 #ifdef P2P_IN_DEDICATED_THREAD
282 # define VERIFY_CORRECT_THREAD() assert(_thread->is_current())
283 #else
284 # define VERIFY_CORRECT_THREAD() do {} while (0)
285 #endif
286 
287 #define MAXIMUM_NUMBER_OF_BLOCKS_TO_HANDLE_AT_ONE_TIME 200
288 #define MAXIMUM_NUMBER_OF_BLOCKS_TO_PREFETCH (10 * MAXIMUM_NUMBER_OF_BLOCKS_TO_HANDLE_AT_ONE_TIME)
289 
290  node_impl::node_impl(const std::string& user_agent) :
292  _thread(std::make_shared<fc::thread>("p2p")),
293 #endif // P2P_IN_DEDICATED_THREAD
294  _delegate(nullptr),
295  _is_firewalled(firewalled_state::unknown),
296  _potential_peer_database_updated(false),
297  _sync_items_to_fetch_updated(false),
298  _suspend_fetching_sync_blocks(false),
299  _items_to_fetch_updated(false),
300  _items_to_fetch_sequence_counter(0),
301  _recent_block_interval_in_seconds(GRAPHENE_MAX_BLOCK_INTERVAL),
302  _user_agent_string(user_agent),
303  _desired_number_of_connections(GRAPHENE_NET_DEFAULT_DESIRED_CONNECTIONS),
304  _maximum_number_of_connections(GRAPHENE_NET_DEFAULT_MAX_CONNECTIONS),
305  _peer_connection_retry_timeout(GRAPHENE_NET_DEFAULT_PEER_CONNECTION_RETRY_TIME),
306  _peer_inactivity_timeout(GRAPHENE_NET_PEER_HANDSHAKE_INACTIVITY_TIMEOUT),
307  _most_recent_blocks_accepted(_maximum_number_of_connections),
308  _total_number_of_unfetched_items(0),
309  _rate_limiter(0, 0),
310  _last_reported_number_of_connections(0),
311  _peer_advertising_disabled(false),
312  _average_network_read_speed_seconds(60),
313  _average_network_write_speed_seconds(60),
314  _average_network_read_speed_minutes(60),
315  _average_network_write_speed_minutes(60),
316  _average_network_read_speed_hours(72),
317  _average_network_write_speed_hours(72),
318  _average_network_usage_second_counter(0),
319  _average_network_usage_minute_counter(0),
320  _node_is_shutting_down(false),
321  _maximum_number_of_blocks_to_handle_at_one_time(MAXIMUM_NUMBER_OF_BLOCKS_TO_HANDLE_AT_ONE_TIME),
322  _maximum_number_of_sync_blocks_to_prefetch(MAXIMUM_NUMBER_OF_BLOCKS_TO_PREFETCH),
323  _maximum_blocks_per_peer_during_syncing(GRAPHENE_NET_MAX_BLOCKS_PER_PEER_DURING_SYNCING)
324  {
326  fc::rand_bytes((char*) _node_id.data(), (int)_node_id.size());
327  }
328 
330  {
332  ilog( "cleaning up node" );
333  _node_is_shutting_down = true;
334 
335  {
337  for (const peer_connection_ptr& active_peer : _active_connections)
338  {
339  fc::optional<fc::ip::endpoint> inbound_endpoint = active_peer->get_endpoint_for_connecting();
340  if (inbound_endpoint)
341  {
343  .lookup_entry_for_endpoint(*inbound_endpoint);
344  if (updated_peer_record)
345  {
346  updated_peer_record->last_seen_time = fc::time_point::now();
347  _potential_peer_db.update_entry(*updated_peer_record);
348  }
349  }
350  }
351  }
352 
353  try
354  {
355  ilog( "close" );
356  close();
357  }
358  catch ( const fc::exception& e )
359  {
360  wlog( "unexpected exception on close ${e}", ("e", e) );
361  }
362  ilog( "done" );
363  }
364 
366  {
369  {
371  try
372  {
373  fc::json::save_to_file( _node_configuration, configuration_file_name );
374  }
375  catch (const fc::canceled_exception&)
376  {
377  throw;
378  }
379  catch ( const fc::exception& except )
380  {
381  elog( "error writing node configuration to file ${filename}: ${error}",
382  ( "filename", configuration_file_name )("error", except.to_detail_string() ) );
383  }
384  }
385  }
386 
388  {
391  {
392  try
393  {
394  dlog("Starting an iteration of p2p_network_connect_loop().");
396 
397  // add-once peers bypass our checks on the maximum/desired number of connections (but they will still be counted against the totals once they're connected)
398  if (!_add_once_node_list.empty())
399  {
400  std::list<potential_peer_record> add_once_node_list;
401  add_once_node_list.swap(_add_once_node_list);
402  dlog("Processing \"add once\" node list containing ${count} peers:", ("count", add_once_node_list.size()));
403  for (const potential_peer_record& add_once_peer : add_once_node_list)
404  {
405  dlog(" ${peer}", ("peer", add_once_peer.endpoint));
406  }
407  for (const potential_peer_record& add_once_peer : add_once_node_list)
408  {
409  // see if we have an existing connection to that peer. If we do, disconnect them and
410  // then try to connect the next time through the loop
411  peer_connection_ptr existing_connection_ptr = get_connection_to_endpoint( add_once_peer.endpoint );
412  if(!existing_connection_ptr)
413  connect_to_endpoint(add_once_peer.endpoint);
414  }
415  dlog("Done processing \"add once\" node list");
416  }
417 
419  {
420  bool initiated_connection_this_pass = false;
422 
425  ++iter)
426  {
427  fc::microseconds delay_until_retry = fc::seconds((iter->number_of_failed_connection_attempts + 1) * _peer_connection_retry_timeout);
428 
429  if (!is_connection_to_endpoint_in_progress(iter->endpoint) &&
430  ((iter->last_connection_disposition != last_connection_failed &&
431  iter->last_connection_disposition != last_connection_rejected &&
432  iter->last_connection_disposition != last_connection_handshaking_failed) ||
433  (fc::time_point::now() - iter->last_connection_attempt_time) > delay_until_retry))
434  {
435  connect_to_endpoint(iter->endpoint);
436  initiated_connection_this_pass = true;
437  }
438  }
439 
440  if (!initiated_connection_this_pass && !_potential_peer_database_updated)
441  break;
442  }
443 
445 
446  // if we broke out of the while loop, that means either we have connected to enough nodes, or
447  // we don't have any good candidates to connect to right now.
448 #if 0
449  try
450  {
451  _retrigger_connect_loop_promise = fc::promise<void>::create("graphene::net::retrigger_connect_loop");
453  {
455  dlog( "Still want to connect to more nodes, but I don't have any good candidates. Trying again in 15 seconds" );
456  else
457  dlog( "I still have some \"add once\" nodes to connect to. Trying again in 15 seconds" );
459  }
460  else
461  {
462  dlog( "I don't need any more connections, waiting forever until something changes" );
464  }
465  }
466  catch ( fc::timeout_exception& ) //intentionally not logged
467  {
468  } // catch
469 #else
470  fc::usleep(fc::seconds(10));
471 #endif
472  }
473  catch (const fc::canceled_exception&)
474  {
475  throw;
476  }
477  FC_CAPTURE_AND_LOG( (0) )
478  }// while(!canceled)
479  }
480 
482  {
484  dlog( "Triggering connect loop now" );
486  //if( _retrigger_connect_loop_promise )
487  // _retrigger_connect_loop_promise->set_value();
488  }
489 
491  {
493 
494  try
495  {
496  dlog("Starting an iteration of update_seed_nodes loop.");
497  for( const std::string& endpoint_string : _seed_nodes )
498  {
499  resolve_seed_node_and_add( endpoint_string );
500  }
501  dlog("Done an iteration of update_seed_nodes loop.");
502  }
503  catch (const fc::canceled_exception&)
504  {
505  throw;
506  }
508 
510  }
511 
513  {
515 
517  return;
518 
520  return;
521 
524  "update_seed_nodes_loop" );
525  }
526 
528  {
530  return std::find_if(_received_sync_items.begin(), _received_sync_items.end(),
531  [&item_hash]( const graphene::net::block_message& message ) { return message.block_id == item_hash; } ) != _received_sync_items.end() ||
532  std::find_if(_new_received_sync_items.begin(), _new_received_sync_items.end(),
533  [&item_hash]( const graphene::net::block_message& message ) { return message.block_id == item_hash; } ) != _new_received_sync_items.end(); ;
534  }
535 
536  void node_impl::request_sync_item_from_peer( const peer_connection_ptr& peer, const item_hash_t& item_to_request )
537  {
539  dlog( "requesting item ${item_hash} from peer ${endpoint}", ("item_hash", item_to_request )("endpoint", peer->get_remote_endpoint() ) );
540  item_id item_id_to_request( graphene::net::block_message_type, item_to_request );
541  _active_sync_requests.insert( active_sync_requests_map::value_type(item_to_request, fc::time_point::now() ) );
542  peer->last_sync_item_received_time = fc::time_point::now();
543  peer->sync_items_requested_from_peer.insert(item_to_request);
544  peer->send_message( fetch_items_message(item_id_to_request.item_type, std::vector<item_hash_t>{item_id_to_request.item_hash} ) );
545  }
546 
547  void node_impl::request_sync_items_from_peer( const peer_connection_ptr& peer, const std::vector<item_hash_t>& items_to_request )
548  {
550  dlog( "requesting ${item_count} item(s) ${items_to_request} from peer ${endpoint}",
551  ("item_count", items_to_request.size())("items_to_request", items_to_request)("endpoint", peer->get_remote_endpoint()) );
552  for (const item_hash_t& item_to_request : items_to_request)
553  {
554  _active_sync_requests.insert( active_sync_requests_map::value_type(item_to_request, fc::time_point::now() ) );
555  peer->last_sync_item_received_time = fc::time_point::now();
556  peer->sync_items_requested_from_peer.insert(item_to_request);
557  }
558  peer->send_message(fetch_items_message(graphene::net::block_message_type, items_to_request));
559  }
560 
562  {
565  {
567  dlog( "beginning another iteration of the sync items loop" );
568 
570  {
571  std::map<peer_connection_ptr, std::vector<item_hash_t> > sync_item_requests_to_send;
572 
573  {
574  std::set<item_hash_t> sync_items_to_request;
575 
576  // for each idle peer that we're syncing with
578  for( const peer_connection_ptr& peer : _active_connections )
579  {
580  if( peer->we_need_sync_items_from_peer &&
581  sync_item_requests_to_send.find(peer) == sync_item_requests_to_send.end() && // if we've already scheduled a request for this peer, don't consider scheduling another
582  peer->idle() )
583  {
584  if (!peer->inhibit_fetching_sync_blocks)
585  {
586  // loop through the items it has that we don't yet have on our blockchain
587  for( unsigned i = 0; i < peer->ids_of_items_to_get.size(); ++i )
588  {
589  item_hash_t item_to_potentially_request = peer->ids_of_items_to_get[i];
590  // if we don't already have this item in our temporary storage and we haven't requested from another syncing peer
591  if( !have_already_received_sync_item(item_to_potentially_request) && // already got it, but for some reson it's still in our list of items to fetch
592  sync_items_to_request.find(item_to_potentially_request) == sync_items_to_request.end() && // we have already decided to request it from another peer during this iteration
593  _active_sync_requests.find(item_to_potentially_request) == _active_sync_requests.end() ) // we've requested it in a previous iteration and we're still waiting for it to arrive
594  {
595  // then schedule a request from this peer
596  sync_item_requests_to_send[peer].push_back(item_to_potentially_request);
597  sync_items_to_request.insert( item_to_potentially_request );
598  if (sync_item_requests_to_send[peer].size() >= _maximum_blocks_per_peer_during_syncing)
599  break;
600  }
601  }
602  }
603  }
604  }
605  } // end non-preemptable section
606 
607  // make all the requests we scheduled in the loop above
608  for( auto sync_item_request : sync_item_requests_to_send )
609  request_sync_items_from_peer( sync_item_request.first, sync_item_request.second );
610  sync_item_requests_to_send.clear();
611  }
612  else
613  dlog("fetch_sync_items_loop is suspended pending backlog processing");
614 
616  {
617  dlog( "no sync items to fetch right now, going to sleep" );
618  _retrigger_fetch_sync_items_loop_promise = fc::promise<void>::create("graphene::net::retrigger_fetch_sync_items_loop");
621  }
622  } // while( !canceled )
623  }
624 
626  {
628  dlog( "Triggering fetch sync items loop now" );
632  }
633 
635  {
637  for( const peer_connection_ptr& peer : _active_connections )
638  {
639  if (peer->inventory_peer_advertised_to_us.find(item) != peer->inventory_peer_advertised_to_us.end() )
640  return true;
641  }
642  return false;
643  }
644 
646  {
649  {
650  _items_to_fetch_updated = false;
651  dlog("beginning an iteration of fetch items (${count} items to fetch)",
652  ("count", _items_to_fetch.size()));
653 
655  fc::time_point next_peer_unblocked_time = fc::time_point::maximum();
656 
657  // we need to construct a list of items to request from each peer first,
658  // then send the messages (in two steps, to avoid yielding while iterating)
659  // we want to evenly distribute our requests among our peers.
660  struct requested_item_count_index {};
661  struct peer_and_items_to_fetch
662  {
663  peer_connection_ptr peer;
664  std::vector<item_id> item_ids;
665  peer_and_items_to_fetch(const peer_connection_ptr& peer) : peer(peer) {}
666  bool operator<(const peer_and_items_to_fetch& rhs) const { return peer < rhs.peer; }
667  size_t number_of_items() const { return item_ids.size(); }
668  };
669  typedef boost::multi_index_container<peer_and_items_to_fetch,
670  boost::multi_index::indexed_by<boost::multi_index::ordered_unique<boost::multi_index::member<peer_and_items_to_fetch, peer_connection_ptr, &peer_and_items_to_fetch::peer> >,
671  boost::multi_index::ordered_non_unique<boost::multi_index::tag<requested_item_count_index>,
672  boost::multi_index::const_mem_fun<peer_and_items_to_fetch, size_t, &peer_and_items_to_fetch::number_of_items> > > > fetch_messages_to_send_set;
673  fetch_messages_to_send_set items_by_peer;
674 
675  // initialize the fetch_messages_to_send with an empty set of items for all idle peers
676  {
678  for (const peer_connection_ptr& peer : _active_connections)
679  if (peer->idle())
680  items_by_peer.insert(peer_and_items_to_fetch(peer));
681  }
682 
683  // now loop over all items we want to fetch
684  for (auto item_iter = _items_to_fetch.begin(); item_iter != _items_to_fetch.end();)
685  {
686  if (item_iter->timestamp < oldest_timestamp_to_fetch)
687  {
688  // this item has probably already fallen out of our peers' caches, we'll just ignore it.
689  // this can happen during flooding, and the _items_to_fetch could otherwise get clogged
690  // with a bunch of items that we'll never be able to request from any peer
691  wlog("Unable to fetch item ${item} before its likely expiration time, removing it from our list of items to fetch", ("item", item_iter->item));
692  item_iter = _items_to_fetch.erase(item_iter);
693  }
694  else
695  {
696  // find a peer that has it, we'll use the one who has the least requests going to it to load balance
697  bool item_fetched = false;
698  for (auto peer_iter = items_by_peer.get<requested_item_count_index>().begin(); peer_iter != items_by_peer.get<requested_item_count_index>().end(); ++peer_iter)
699  {
700  const peer_connection_ptr& peer = peer_iter->peer;
701  // if they have the item and we haven't already decided to ask them for too many other items
702  if (peer_iter->item_ids.size() < GRAPHENE_NET_MAX_ITEMS_PER_PEER_DURING_NORMAL_OPERATION &&
703  peer->inventory_peer_advertised_to_us.find(item_iter->item) != peer->inventory_peer_advertised_to_us.end())
704  {
705  if (item_iter->item.item_type == graphene::net::trx_message_type && peer->is_transaction_fetching_inhibited())
706  next_peer_unblocked_time = std::min(peer->transaction_fetching_inhibited_until, next_peer_unblocked_time);
707  else
708  {
709  //dlog("requesting item ${hash} from peer ${endpoint}",
710  // ("hash", iter->item.item_hash)("endpoint", peer->get_remote_endpoint()));
711  item_id item_id_to_fetch = item_iter->item;
712  peer->items_requested_from_peer.insert(peer_connection::item_to_time_map_type::value_type(item_id_to_fetch, fc::time_point::now()));
713  item_iter = _items_to_fetch.erase(item_iter);
714  item_fetched = true;
715  items_by_peer.get<requested_item_count_index>().modify(peer_iter, [&item_id_to_fetch](peer_and_items_to_fetch& peer_and_items) {
716  peer_and_items.item_ids.push_back(item_id_to_fetch);
717  });
718  break;
719  }
720  }
721  }
722  if (!item_fetched)
723  ++item_iter;
724  }
725  }
726 
727  // we've figured out which peer will be providing each item, now send the messages.
728  for (const peer_and_items_to_fetch& peer_and_items : items_by_peer)
729  {
730  // the item lists are heterogenous and
731  // the fetch_items_message can only deal with one item type at a time.
732  std::map<uint32_t, std::vector<item_hash_t> > items_to_fetch_by_type;
733  for (const item_id& item : peer_and_items.item_ids)
734  items_to_fetch_by_type[item.item_type].push_back(item.item_hash);
735  for (auto& items_by_type : items_to_fetch_by_type)
736  {
737  dlog("requesting ${count} items of type ${type} from peer ${endpoint}: ${hashes}",
738  ("count", items_by_type.second.size())("type", (uint32_t)items_by_type.first)
739  ("endpoint", peer_and_items.peer->get_remote_endpoint())
740  ("hashes", items_by_type.second));
741  peer_and_items.peer->send_message(fetch_items_message(items_by_type.first,
742  items_by_type.second));
743  }
744  }
745  items_by_peer.clear();
746 
748  {
749  _retrigger_fetch_item_loop_promise = fc::promise<void>::create("graphene::net::retrigger_fetch_item_loop");
750  fc::microseconds time_until_retrigger = fc::microseconds::maximum();
751  if (next_peer_unblocked_time != fc::time_point::maximum())
752  time_until_retrigger = next_peer_unblocked_time - fc::time_point::now();
753  try
754  {
755  if (time_until_retrigger > fc::microseconds(0))
756  _retrigger_fetch_item_loop_promise->wait(time_until_retrigger);
757  }
758  catch (const fc::timeout_exception&)
759  {
760  dlog("Resuming fetch_items_loop due to timeout -- one of our peers should no longer be throttled");
761  }
763  }
764  } // while (!canceled)
765  }
766 
768  {
773  }
774 
776  {
779  {
780  dlog("beginning an iteration of advertise inventory");
781  // swap inventory into local variable, clearing the node's copy
782  std::unordered_set<item_id> inventory_to_advertise;
783  inventory_to_advertise.swap(_new_inventory);
784 
785  // process all inventory to advertise and construct the inventory messages we'll send
786  // first, then send them all in a batch (to avoid any fiber interruption points while
787  // we're computing the messages)
788  std::list<std::pair<peer_connection_ptr, item_ids_inventory_message> > inventory_messages_to_send;
789  {
791  for (const peer_connection_ptr& peer : _active_connections)
792  {
793  // only advertise to peers who are in sync with us
794  idump((peer->peer_needs_sync_items_from_us));
795  if( !peer->peer_needs_sync_items_from_us )
796  {
797  std::map<uint32_t, std::vector<item_hash_t> > items_to_advertise_by_type;
798  // don't send the peer anything we've already advertised to it
799  // or anything it has advertised to us
800  // group the items we need to send by type, because we'll need to send one inventory message per type
801  unsigned total_items_to_send_to_this_peer = 0;
802  idump((inventory_to_advertise));
803  for (const item_id& item_to_advertise : inventory_to_advertise)
804  {
805  auto adv_to_peer = peer->inventory_advertised_to_peer.find(item_to_advertise);
806  auto adv_to_us = peer->inventory_peer_advertised_to_us.find(item_to_advertise);
807 
808  if (adv_to_peer == peer->inventory_advertised_to_peer.end() &&
809  adv_to_us == peer->inventory_peer_advertised_to_us.end())
810  {
811  items_to_advertise_by_type[item_to_advertise.item_type].push_back(item_to_advertise.item_hash);
812  peer->inventory_advertised_to_peer.insert(peer_connection::timestamped_item_id(item_to_advertise, fc::time_point::now()));
813  ++total_items_to_send_to_this_peer;
814  if (item_to_advertise.item_type == trx_message_type)
815  testnetlog("advertising transaction ${id} to peer ${endpoint}", ("id", item_to_advertise.item_hash)("endpoint", peer->get_remote_endpoint()));
816  dlog("advertising item ${id} to peer ${endpoint}", ("id", item_to_advertise.item_hash)("endpoint", peer->get_remote_endpoint()));
817  }
818  else
819  {
820  if (adv_to_peer != peer->inventory_advertised_to_peer.end() )
821  idump( (*adv_to_peer) );
822  if (adv_to_us != peer->inventory_peer_advertised_to_us.end() )
823  idump( (*adv_to_us) );
824  }
825  }
826  dlog("advertising ${count} new item(s) of ${types} type(s) to peer ${endpoint}",
827  ("count", total_items_to_send_to_this_peer)
828  ("types", items_to_advertise_by_type.size())
829  ("endpoint", peer->get_remote_endpoint()));
830  for (auto items_group : items_to_advertise_by_type)
831  inventory_messages_to_send.push_back(std::make_pair(peer, item_ids_inventory_message(items_group.first, items_group.second)));
832  }
833  peer->clear_old_inventory();
834  }
835  } // lock_guard
836 
837  for (auto iter = inventory_messages_to_send.begin(); iter != inventory_messages_to_send.end(); ++iter)
838  iter->first->send_message(iter->second);
839  inventory_messages_to_send.clear();
840 
841  if (_new_inventory.empty())
842  {
843  _retrigger_advertise_inventory_loop_promise = fc::promise<void>::create("graphene::net::retrigger_advertise_inventory_loop");
846  }
847  } // while(!canceled)
848  }
849 
851  {
855  }
856 
858  {
860  std::list<peer_connection_ptr> peers_to_disconnect_gently;
861  std::list<peer_connection_ptr> peers_to_disconnect_forcibly;
862  std::list<peer_connection_ptr> peers_to_send_keep_alive;
863  std::list<peer_connection_ptr> peers_to_terminate;
864 
865  _recent_block_interval_in_seconds = _delegate->get_current_block_interval_in_seconds();
866 
867  // Disconnect peers that haven't sent us any data recently
868  // These numbers are just guesses and we need to think through how this works better.
869  // If we and our peers get disconnected from the rest of the network, we will not
870  // receive any blocks or transactions from the rest of the world, and that will
871  // probably make us disconnect from our peers even though we have working connections to
872  // them (but they won't have sent us anything since they aren't getting blocks either).
873  // This might not be so bad because it could make us initiate more connections and
874  // reconnect with the rest of the network, or it might just futher isolate us.
875  // As usual, the first step is to walk through all our peers and figure out which
876  // peers need action (disconneting, sending keepalives, etc), then we walk through
877  // those lists yielding at our leisure later.
878 
879  uint32_t handshaking_timeout = _peer_inactivity_timeout;
880  fc::time_point handshaking_disconnect_threshold = fc::time_point::now() - fc::seconds(handshaking_timeout);
881  {
883  for( const peer_connection_ptr handshaking_peer : _handshaking_connections )
884  {
885  if( handshaking_peer->connection_initiation_time < handshaking_disconnect_threshold &&
886  handshaking_peer->get_last_message_received_time() < handshaking_disconnect_threshold &&
887  handshaking_peer->get_last_message_sent_time() < handshaking_disconnect_threshold )
888  {
889  wlog( "Forcibly disconnecting from handshaking peer ${peer} due to inactivity of at least ${timeout} seconds",
890  ( "peer", handshaking_peer->get_remote_endpoint() )("timeout", handshaking_timeout ) );
891  wlog("Peer's negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}",
892  ("status", handshaking_peer->negotiation_status)
893  ("sent", handshaking_peer->get_total_bytes_sent())
894  ("received", handshaking_peer->get_total_bytes_received()));
895  handshaking_peer->connection_closed_error = fc::exception(FC_LOG_MESSAGE(warn,
896  "Terminating handshaking connection due to inactivity of ${timeout} seconds. Negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}",
897  ("peer", handshaking_peer->get_remote_endpoint())
898  ("timeout", handshaking_timeout)
899  ("status", handshaking_peer->negotiation_status)
900  ("sent", handshaking_peer->get_total_bytes_sent())
901  ("received", handshaking_peer->get_total_bytes_received())));
902  peers_to_disconnect_forcibly.push_back( handshaking_peer );
903  } // if
904  } // for
905  } // scoped_lock
906  // timeout for any active peers is two block intervals
907  uint32_t active_disconnect_timeout = 10 * _recent_block_interval_in_seconds;
908  uint32_t active_send_keepalive_timeout = active_disconnect_timeout / 2;
909 
910  // set the ignored request time out to 6 second. When we request a block
911  // or transaction from a peer, this timeout determines how long we wait for them
912  // to reply before we give up and ask another peer for the item.
913  // Ideally this should be significantly shorter than the block interval, because
914  // we'd like to realize the block isn't coming and fetch it from a different
915  // peer before the next block comes in.
916  // Increased to 6 from 1 in #1660 due to heavy load. May need to adjust further
917  // Note: #1660 is https://github.com/steemit/steem/issues/1660
918  fc::microseconds active_ignored_request_timeout = fc::seconds(6);
919 
920  fc::time_point active_disconnect_threshold = fc::time_point::now() - fc::seconds(active_disconnect_timeout);
921  fc::time_point active_send_keepalive_threshold = fc::time_point::now() - fc::seconds(active_send_keepalive_timeout);
922  fc::time_point active_ignored_request_threshold = fc::time_point::now() - active_ignored_request_timeout;
923  {
925 
926  for( const peer_connection_ptr& active_peer : _active_connections )
927  {
928  if( active_peer->connection_initiation_time < active_disconnect_threshold &&
929  active_peer->get_last_message_received_time() < active_disconnect_threshold )
930  {
931  wlog( "Closing connection with peer ${peer} due to inactivity of at least ${timeout} seconds",
932  ( "peer", active_peer->get_remote_endpoint() )("timeout", active_disconnect_timeout ) );
933  peers_to_disconnect_gently.push_back( active_peer );
934  }
935  else
936  {
937  bool disconnect_due_to_request_timeout = false;
938  if (!active_peer->sync_items_requested_from_peer.empty() &&
939  active_peer->last_sync_item_received_time < active_ignored_request_threshold)
940  {
941  wlog("Disconnecting peer ${peer} because they haven't made any progress on my remaining ${count} sync item requests",
942  ("peer", active_peer->get_remote_endpoint())("count",
943  active_peer->sync_items_requested_from_peer.size()));
944  disconnect_due_to_request_timeout = true;
945  }
946  if (!disconnect_due_to_request_timeout &&
947  active_peer->item_ids_requested_from_peer &&
948  active_peer->item_ids_requested_from_peer->get<1>() < active_ignored_request_threshold)
949  {
950  wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ids after ${synopsis}",
951  ("peer", active_peer->get_remote_endpoint())
952  ("synopsis", active_peer->item_ids_requested_from_peer->get<0>()));
953  disconnect_due_to_request_timeout = true;
954  }
955  if (!disconnect_due_to_request_timeout)
956  for (const peer_connection::item_to_time_map_type::value_type& item_and_time : active_peer->items_requested_from_peer)
957  if (item_and_time.second < active_ignored_request_threshold)
958  {
959  wlog("Disconnecting peer ${peer} because they didn't respond to my request for item ${id}",
960  ("peer", active_peer->get_remote_endpoint())("id", item_and_time.first.item_hash));
961  disconnect_due_to_request_timeout = true;
962  break;
963  }
964  if (disconnect_due_to_request_timeout)
965  {
966  // we should probably disconnect nicely and give them a reason, but right now the logic
967  // for rescheduling the requests only executes when the connection is fully closed,
968  // and we want to get those requests rescheduled as soon as possible
969  peers_to_disconnect_forcibly.push_back(active_peer);
970  }
971  else if (active_peer->connection_initiation_time < active_send_keepalive_threshold &&
972  active_peer->get_last_message_received_time() < active_send_keepalive_threshold)
973  {
974  wlog( "Sending a keepalive message to peer ${peer} who hasn't sent us any messages in the last ${timeout} seconds",
975  ( "peer", active_peer->get_remote_endpoint() )("timeout", active_send_keepalive_timeout ) );
976  peers_to_send_keep_alive.push_back(active_peer);
977  }
978  else if (active_peer->we_need_sync_items_from_peer &&
979  !active_peer->is_currently_handling_message() &&
980  !active_peer->item_ids_requested_from_peer &&
981  active_peer->ids_of_items_to_get.empty())
982  {
983  // This is a state we should never get into in the first place, but if we do, we should disconnect the peer
984  // to re-establish the connection.
985  fc_wlog(fc::logger::get("sync"), "Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.",
986  ("peer", active_peer->get_remote_endpoint()));
987  wlog("Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.",
988  ("peer", active_peer->get_remote_endpoint()));
989  peers_to_disconnect_forcibly.push_back(active_peer);
990  }
991  } // else
992  } // for
993  } // scoped_lock
994 
996  {
998  for( const peer_connection_ptr& closing_peer : _closing_connections )
999  {
1000  if( closing_peer->connection_closed_time < closing_disconnect_threshold )
1001  {
1002  // we asked this peer to close their connectoin to us at least GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT
1003  // seconds ago, but they haven't done it yet. Terminate the connection now
1004  wlog( "Forcibly disconnecting peer ${peer} who failed to close their connection in a timely manner",
1005  ( "peer", closing_peer->get_remote_endpoint() ) );
1006  peers_to_disconnect_forcibly.push_back( closing_peer );
1007  }
1008  } // for
1009  } // scoped_lock
1010  uint32_t failed_terminate_timeout_seconds = 120;
1011  fc::time_point failed_terminate_threshold = fc::time_point::now() - fc::seconds(failed_terminate_timeout_seconds);
1012  {
1014  for (const peer_connection_ptr& peer : _terminating_connections )
1015  {
1016  if (peer->get_connection_terminated_time() != fc::time_point::min() &&
1017  peer->get_connection_terminated_time() < failed_terminate_threshold)
1018  {
1019  wlog("Terminating connection with peer ${peer}, closing the connection didn't work", ("peer", peer->get_remote_endpoint()));
1020  peers_to_terminate.push_back(peer);
1021  }
1022  }
1023  } // scoped_lock
1024  // That's the end of the sorting step; now all peers that require further processing are now in one of the
1025  // lists peers_to_disconnect_gently, peers_to_disconnect_forcibly, peers_to_send_keep_alive, or peers_to_terminate
1026 
1027  // if we've decided to delete any peers, do it now; in its current implementation this doesn't yield,
1028  // and once we start yielding, we may find that we've moved that peer to another list (closed or active)
1029  // and that triggers assertions, maybe even errors
1030  {
1032  for (const peer_connection_ptr& peer : peers_to_terminate )
1033  {
1037  }
1038  } // scoped_lock
1039  peers_to_terminate.clear();
1040 
1041  // if we're going to abruptly disconnect anyone, do it here
1042  // (it doesn't yield). I don't think there would be any harm if this were
1043  // moved to the yielding section
1044  for( const peer_connection_ptr& peer : peers_to_disconnect_forcibly )
1045  {
1047  peer->close_connection();
1048  }
1049  peers_to_disconnect_forcibly.clear();
1050 
1051  // Now process the peers that we need to do yielding functions with (disconnect sends a message with the
1052  // disconnect reason, so it may yield)
1053  for( const peer_connection_ptr& peer : peers_to_disconnect_gently )
1054  {
1055  {
1057  fc::exception detailed_error( FC_LOG_MESSAGE(warn, "Disconnecting due to inactivity",
1058  ( "last_message_received_seconds_ago", (peer->get_last_message_received_time()
1059  - fc::time_point::now() ).count() / fc::seconds(1 ).count() )
1060  ( "last_message_sent_seconds_ago", (peer->get_last_message_sent_time()
1061  - fc::time_point::now() ).count() / fc::seconds(1 ).count() )
1062  ( "inactivity_timeout", _active_connections.find(peer ) != _active_connections.end()
1064  disconnect_from_peer( peer.get(), "Disconnecting due to inactivity", false, detailed_error );
1065  }
1066  }
1067  peers_to_disconnect_gently.clear();
1068 
1069  for( const peer_connection_ptr& peer : peers_to_send_keep_alive )
1070  peer->send_message(current_time_request_message(),
1071  offsetof(current_time_request_message, request_sent_time));
1072  peers_to_send_keep_alive.clear();
1073 
1077  "terminate_inactive_connections_loop" );
1078  }
1079 
1081  {
1083 
1084  {
1086  // JMJ 2018-10-22 Unsure why we're making a copy here, but this is probably unnecessary
1087  std::list<peer_connection_ptr> original_active_peers(_active_connections.begin(), _active_connections.end());
1088  for( const peer_connection_ptr& active_peer : original_active_peers )
1089  {
1090  try
1091  {
1092  active_peer->send_message(address_request_message());
1093  }
1094  catch ( const fc::canceled_exception& )
1095  {
1096  throw;
1097  }
1098  catch (const fc::exception& e)
1099  {
1100  dlog("Caught exception while sending address request message to peer ${peer} : ${e}",
1101  ("peer", active_peer->get_remote_endpoint())("e", e));
1102  }
1103  }
1104  }
1105 
1106  // this has nothing to do with updating the peer list, but we need to prune this list
1107  // at regular intervals, this is a fine place to do it.
1108  fc::time_point_sec oldest_failed_ids_to_keep(fc::time_point::now() - fc::minutes(15));
1109  auto oldest_failed_ids_to_keep_iter = _recently_failed_items.get<peer_connection::timestamp_index>().lower_bound(oldest_failed_ids_to_keep);
1110  auto begin_iter = _recently_failed_items.get<peer_connection::timestamp_index>().begin();
1111  _recently_failed_items.get<peer_connection::timestamp_index>().erase(begin_iter, oldest_failed_ids_to_keep_iter);
1112 
1116  "fetch_updated_peer_lists_loop" );
1117  }
1118  void node_impl::update_bandwidth_data(uint32_t bytes_read_this_second, uint32_t bytes_written_this_second)
1119  {
1121  _average_network_read_speed_seconds.push_back(bytes_read_this_second);
1122  _average_network_write_speed_seconds.push_back(bytes_written_this_second);
1125  {
1128  uint32_t average_read_this_minute = (uint32_t)boost::accumulate(_average_network_read_speed_seconds, uint64_t(0)) / (uint32_t)_average_network_read_speed_seconds.size();
1129  _average_network_read_speed_minutes.push_back(average_read_this_minute);
1130  uint32_t average_written_this_minute = (uint32_t)boost::accumulate(_average_network_write_speed_seconds, uint64_t(0)) / (uint32_t)_average_network_write_speed_seconds.size();
1131  _average_network_write_speed_minutes.push_back(average_written_this_minute);
1133  {
1135  uint32_t average_read_this_hour = (uint32_t)boost::accumulate(_average_network_read_speed_minutes, uint64_t(0)) / (uint32_t)_average_network_read_speed_minutes.size();
1136  _average_network_read_speed_hours.push_back(average_read_this_hour);
1137  uint32_t average_written_this_hour = (uint32_t)boost::accumulate(_average_network_write_speed_minutes, uint64_t(0)) / (uint32_t)_average_network_write_speed_minutes.size();
1138  _average_network_write_speed_hours.push_back(average_written_this_hour);
1139  }
1140  }
1141  }
1143  {
1145  fc::time_point_sec current_time = fc::time_point::now();
1146 
1148  _bandwidth_monitor_last_update_time = current_time;
1149 
1150  uint32_t seconds_since_last_update = current_time.sec_since_epoch() - _bandwidth_monitor_last_update_time.sec_since_epoch();
1151  seconds_since_last_update = std::max(UINT32_C(1), seconds_since_last_update);
1152  uint32_t bytes_read_this_second = _rate_limiter.get_actual_download_rate();
1153  uint32_t bytes_written_this_second = _rate_limiter.get_actual_upload_rate();
1154  for (uint32_t i = 0; i < seconds_since_last_update - 1; ++i)
1155  update_bandwidth_data(0, 0);
1156  update_bandwidth_data(bytes_read_this_second, bytes_written_this_second);
1157  _bandwidth_monitor_last_update_time = current_time;
1158 
1162  "bandwidth_monitor_loop" );
1163  }
1164 
1166  {
1168  dump_node_status();
1172  "dump_node_status_task");
1173  }
1174 
1176  {
1178 #ifdef USE_PEERS_TO_DELETE_MUTEX
1179  fc::scoped_lock<fc::mutex> lock(_peers_to_delete_mutex);
1180  dlog("in delayed_peer_deletion_task with ${count} in queue", ("count", _peers_to_delete.size()));
1181  _peers_to_delete.clear();
1182  dlog("_peers_to_delete cleared");
1183 #else
1184  while (!_peers_to_delete.empty())
1185  {
1186  std::list<peer_connection_ptr> peers_to_delete_copy;
1187  dlog("beginning an iteration of delayed_peer_deletion_task with ${count} in queue", ("count", _peers_to_delete.size()));
1188  peers_to_delete_copy.swap(_peers_to_delete);
1189  }
1190  dlog("leaving delayed_peer_deletion_task");
1191 #endif
1192  }
1193 
1195  {
1197 
1198  assert(_handshaking_connections.find(peer_to_delete) == _handshaking_connections.end());
1199  assert(_active_connections.find(peer_to_delete) == _active_connections.end());
1200  assert(_closing_connections.find(peer_to_delete) == _closing_connections.end());
1201  assert(_terminating_connections.find(peer_to_delete) == _terminating_connections.end());
1202 
1203 #ifdef USE_PEERS_TO_DELETE_MUTEX
1204  dlog("scheduling peer for deletion: ${peer} (may block on a mutex here)", ("peer", peer_to_delete->get_remote_endpoint()));
1205 
1206  unsigned number_of_peers_to_delete;
1207  {
1208  fc::scoped_lock<fc::mutex> lock(_peers_to_delete_mutex);
1209  _peers_to_delete.emplace_back(peer_to_delete);
1210  number_of_peers_to_delete = _peers_to_delete.size();
1211  }
1212  dlog("peer scheduled for deletion: ${peer}", ("peer", peer_to_delete->get_remote_endpoint()));
1213 
1214  if (!_node_is_shutting_down &&
1216  {
1217  dlog("asyncing delayed_peer_deletion_task to delete ${size} peers", ("size", number_of_peers_to_delete));
1218  _delayed_peer_deletion_task_done = fc::async([this](){ delayed_peer_deletion_task(); }, "delayed_peer_deletion_task" );
1219  }
1220  else
1221  dlog("delayed_peer_deletion_task is already scheduled (current size of _peers_to_delete is ${size})", ("size", number_of_peers_to_delete));
1222 #else
1223  dlog("scheduling peer for deletion: ${peer} (this will not block)", ("peer", peer_to_delete->get_remote_endpoint()));
1224  _peers_to_delete.push_back(peer_to_delete);
1225  if (!_node_is_shutting_down &&
1227  {
1228  dlog("asyncing delayed_peer_deletion_task to delete ${size} peers", ("size", _peers_to_delete.size()));
1229  _delayed_peer_deletion_task_done = fc::async([this](){ delayed_peer_deletion_task(); }, "delayed_peer_deletion_task" );
1230  }
1231  else
1232  dlog("delayed_peer_deletion_task is already scheduled (current size of _peers_to_delete is ${size})", ("size", _peers_to_delete.size()));
1233 
1234 #endif
1235  }
1236 
1238  {
1241  }
1242 
1244  {
1247  }
1248 
1250  {
1252  return (uint32_t)(_handshaking_connections.size() + _active_connections.size());
1253  }
1254 
1256  {
1257  {
1259  for (const peer_connection_ptr& active_peer : _active_connections)
1260  if (node_id == active_peer->node_id)
1261  return active_peer;
1262  }
1263  {
1265  for (const peer_connection_ptr& handshaking_peer : _handshaking_connections)
1266  if (node_id == handshaking_peer->node_id)
1267  return handshaking_peer;
1268  }
1269  return peer_connection_ptr();
1270  }
1271 
1273  {
1275  if (node_id == _node_id)
1276  {
1277  dlog("is_already_connected_to_id returning true because the peer is us");
1278  return true;
1279  }
1280  {
1282  for (const peer_connection_ptr active_peer : _active_connections)
1283  {
1284  if (node_id == active_peer->node_id)
1285  {
1286  dlog("is_already_connected_to_id returning true because the peer is already in our active list");
1287  return true;
1288  }
1289  }
1290  }
1291  {
1293  for (const peer_connection_ptr handshaking_peer : _handshaking_connections)
1294  if (node_id == handshaking_peer->node_id)
1295  {
1296  dlog("is_already_connected_to_id returning true because the peer is already in our handshaking list");
1297  return true;
1298  }
1299  }
1300  return false;
1301  }
1302 
1303  // merge addresses received from a peer into our database
1304  bool node_impl::merge_address_info_with_potential_peer_database(const std::vector<address_info> addresses)
1305  {
1307  bool new_information_received = false;
1308  for (const address_info& address : addresses)
1309  {
1310  if (address.firewalled == graphene::net::firewalled_state::not_firewalled)
1311  {
1312  potential_peer_record updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_endpoint(address.remote_endpoint);
1313  if (address.last_seen_time > updated_peer_record.last_seen_time)
1314  new_information_received = true;
1315  updated_peer_record.last_seen_time = std::max(address.last_seen_time, updated_peer_record.last_seen_time);
1316  _potential_peer_db.update_entry(updated_peer_record);
1317  }
1318  }
1319  return new_information_received;
1320  }
1321 
1323  {
1325  dlog("Currently have ${current} of [${desired}/${max}] connections",
1326  ("current", get_number_of_connections())
1327  ("desired", _desired_number_of_connections)
1329  dlog(" my id is ${id}", ("id", _node_id));
1330 
1331  {
1333  for (const peer_connection_ptr& active_connection : _active_connections)
1334  {
1335  dlog(" active: ${endpoint} with ${id} [${direction}]",
1336  ("endpoint", active_connection->get_remote_endpoint())
1337  ("id", active_connection->node_id)
1338  ("direction", active_connection->direction));
1339  }
1340  }
1341  {
1343  for (const peer_connection_ptr& handshaking_connection : _handshaking_connections)
1344  {
1345  dlog(" handshaking: ${endpoint} with ${id} [${direction}]",
1346  ("endpoint", handshaking_connection->get_remote_endpoint())
1347  ("id", handshaking_connection->node_id)
1348  ("direction", handshaking_connection->direction));
1349  }
1350  }
1351  }
1352 
1353  void node_impl::on_message( peer_connection* originating_peer, const message& received_message )
1354  {
1356  message_hash_type message_hash = received_message.id();
1357  dlog("handling message ${type} ${hash} size ${size} from peer ${endpoint}",
1358  ("type", graphene::net::core_message_type_enum(received_message.msg_type.value()))("hash", message_hash)
1359  ("size", received_message.size)
1360  ("endpoint", originating_peer->get_remote_endpoint()));
1361  switch ( received_message.msg_type.value() )
1362  {
1364  on_hello_message(originating_peer, received_message.as<hello_message>());
1365  break;
1367  on_connection_accepted_message(originating_peer, received_message.as<connection_accepted_message>());
1368  break;
1370  on_connection_rejected_message(originating_peer, received_message.as<connection_rejected_message>());
1371  break;
1373  on_address_request_message(originating_peer, received_message.as<address_request_message>());
1374  break;
1376  on_address_message(originating_peer, received_message.as<address_message>());
1377  break;
1379  on_fetch_blockchain_item_ids_message(originating_peer, received_message.as<fetch_blockchain_item_ids_message>());
1380  break;
1383  break;
1385  on_fetch_items_message(originating_peer, received_message.as<fetch_items_message>());
1386  break;
1388  on_item_not_available_message(originating_peer, received_message.as<item_not_available_message>());
1389  break;
1391  on_item_ids_inventory_message(originating_peer, received_message.as<item_ids_inventory_message>());
1392  break;
1394  on_closing_connection_message(originating_peer, received_message.as<closing_connection_message>());
1395  break;
1397  process_block_message(originating_peer, received_message, message_hash);
1398  break;
1400  on_current_time_request_message(originating_peer, received_message.as<current_time_request_message>());
1401  break;
1403  on_current_time_reply_message(originating_peer, received_message.as<current_time_reply_message>());
1404  break;
1406  on_check_firewall_message(originating_peer, received_message.as<check_firewall_message>());
1407  break;
1409  on_check_firewall_reply_message(originating_peer, received_message.as<check_firewall_reply_message>());
1410  break;
1413  break;
1416  break;
1417 
1418  default:
1419  // ignore any message in between core_message_type_first and _last that we don't handle above
1420  // to allow us to add messages in the future
1421  if (received_message.msg_type.value() < core_message_type_enum::core_message_type_first ||
1422  received_message.msg_type.value() > core_message_type_enum::core_message_type_last)
1423  process_ordinary_message(originating_peer, received_message, message_hash);
1424  break;
1425  }
1426  }
1427 
1428 
1430  {
1432  // for the time being, shoehorn a bunch of properties into the user_data variant object,
1433  // which lets us add and remove fields without changing the protocol. Once we
1434  // settle on what we really want in there, we'll likely promote them to first
1435  // class fields in the hello message
1436  fc::mutable_variant_object user_data;
1437  user_data["fc_git_revision_sha"] = fc::git_revision_sha;
1438  user_data["fc_git_revision_unix_timestamp"] = fc::git_revision_unix_timestamp;
1439 #if defined( __APPLE__ )
1440  user_data["platform"] = "osx";
1441 #elif defined( __OpenBSD__ )
1442  user_data["platform"] = "obsd";
1443 #elif defined( __linux__ )
1444  user_data["platform"] = "linux";
1445 #elif defined( _MSC_VER )
1446  user_data["platform"] = "win32";
1447 #else
1448  user_data["platform"] = "other";
1449 #endif
1450  user_data["bitness"] = sizeof(void*) * 8;
1451 
1452  user_data["node_id"] = fc::variant( _node_id, 1 );
1453 
1454  item_hash_t head_block_id = _delegate->get_head_block_id();
1455  user_data["last_known_block_hash"] = fc::variant( head_block_id, 1 );
1456  user_data["last_known_block_number"] = _delegate->get_block_number(head_block_id);
1457  user_data["last_known_block_time"] = _delegate->get_block_time(head_block_id);
1458 
1459  if (!_hard_fork_block_numbers.empty())
1460  user_data["last_known_fork_block_number"] = _hard_fork_block_numbers.back();
1461 
1462  return user_data;
1463  }
1465  {
1467  // try to parse data out of the user_agent string
1468  if (user_data.contains("graphene_git_revision_sha"))
1469  originating_peer->graphene_git_revision_sha = user_data["graphene_git_revision_sha"].as_string();
1470  if (user_data.contains("graphene_git_revision_unix_timestamp"))
1471  originating_peer->graphene_git_revision_unix_timestamp = fc::time_point_sec(user_data["graphene_git_revision_unix_timestamp"].as<uint32_t>(1));
1472  if (user_data.contains("fc_git_revision_sha"))
1473  originating_peer->fc_git_revision_sha = user_data["fc_git_revision_sha"].as_string();
1474  if (user_data.contains("fc_git_revision_unix_timestamp"))
1475  originating_peer->fc_git_revision_unix_timestamp = fc::time_point_sec(user_data["fc_git_revision_unix_timestamp"].as<uint32_t>(1));
1476  if (user_data.contains("platform"))
1477  originating_peer->platform = user_data["platform"].as_string();
1478  if (user_data.contains("bitness"))
1479  originating_peer->bitness = user_data["bitness"].as<uint32_t>(1);
1480  if (user_data.contains("node_id"))
1481  originating_peer->node_id = user_data["node_id"].as<node_id_t>(1);
1482  if (user_data.contains("last_known_fork_block_number"))
1483  originating_peer->last_known_fork_block_number = user_data["last_known_fork_block_number"].as<uint32_t>(1);
1484  }
1485 
1486  void node_impl::on_hello_message( peer_connection* originating_peer, const hello_message& hello_message_received )
1487  {
1489  // this already_connected check must come before we fill in peer data below
1490  node_id_t peer_node_id = hello_message_received.node_public_key;
1491  try
1492  {
1493  peer_node_id = hello_message_received.user_data["node_id"].as<node_id_t>(1);
1494  }
1495  catch (const fc::exception&)
1496  {
1497  // either it's not there or it's not a valid session id. either way, ignore.
1498  }
1499  bool already_connected_to_this_peer = is_already_connected_to_id(peer_node_id);
1500 
1501  // validate the node id
1502  fc::sha256::encoder shared_secret_encoder;
1503  fc::sha512 shared_secret = originating_peer->get_shared_secret();
1504  shared_secret_encoder.write(shared_secret.data(), sizeof(shared_secret));
1505  fc::ecc::public_key expected_node_public_key(hello_message_received.signed_shared_secret, shared_secret_encoder.result(), false);
1506 
1507  // store off the data provided in the hello message
1508  originating_peer->user_agent = hello_message_received.user_agent;
1509  originating_peer->node_public_key = hello_message_received.node_public_key;
1510  originating_peer->node_id = hello_message_received.node_public_key; // will probably be overwritten in parse_hello_user_data_for_peer()
1511  originating_peer->core_protocol_version = hello_message_received.core_protocol_version;
1512  originating_peer->inbound_address = hello_message_received.inbound_address;
1513  originating_peer->inbound_port = hello_message_received.inbound_port;
1514  originating_peer->outbound_port = hello_message_received.outbound_port;
1515 
1516  parse_hello_user_data_for_peer(originating_peer, hello_message_received.user_data);
1517 
1518  // if they didn't provide a last known fork, try to guess it
1519  if (originating_peer->last_known_fork_block_number == 0 &&
1520  originating_peer->graphene_git_revision_unix_timestamp)
1521  {
1522  uint32_t unix_timestamp = originating_peer->graphene_git_revision_unix_timestamp->sec_since_epoch();
1523  originating_peer->last_known_fork_block_number = _delegate->estimate_last_known_fork_from_git_revision_timestamp(unix_timestamp);
1524  }
1525 
1526  // now decide what to do with it
1528  {
1529  if (hello_message_received.node_public_key != expected_node_public_key.serialize())
1530  {
1531  wlog("Invalid signature in hello message from peer ${peer}", ("peer", originating_peer->get_remote_endpoint()));
1532  std::string rejection_message("Invalid signature in hello message");
1534  originating_peer->get_socket().remote_endpoint(),
1536  rejection_message);
1537 
1539  originating_peer->send_message( message(connection_rejected ) );
1540  // for this type of message, we're immediately disconnecting this peer
1541  disconnect_from_peer( originating_peer, "Invalid signature in hello message" );
1542  return;
1543  }
1544  if (hello_message_received.chain_id != _chain_id)
1545  {
1546  wlog("Received hello message from peer on a different chain: ${message}", ("message", hello_message_received));
1547  std::ostringstream rejection_message;
1548  rejection_message << "You're on a different chain than I am. I'm on " << _chain_id.str() <<
1549  " and you're on " << hello_message_received.chain_id.str();
1551  originating_peer->get_socket().remote_endpoint(),
1553  rejection_message.str());
1554 
1556  originating_peer->send_message(message(connection_rejected));
1557  // for this type of message, we're immediately disconnecting this peer, instead of trying to
1558  // allowing her to ask us for peers (any of our peers will be on the same chain as us, so there's no
1559  // benefit of sharing them)
1560  disconnect_from_peer(originating_peer, "You are on a different chain from me");
1561  return;
1562  }
1563  if (originating_peer->last_known_fork_block_number != 0)
1564  {
1565  uint32_t next_fork_block_number = get_next_known_hard_fork_block_number(originating_peer->last_known_fork_block_number);
1566  if (next_fork_block_number != 0)
1567  {
1568  // we know about a fork they don't. See if we've already passed that block. If we have, don't let them
1569  // connect because we won't be able to give them anything useful
1570  uint32_t head_block_num = _delegate->get_block_number(_delegate->get_head_block_id());
1571  if (next_fork_block_number < head_block_num)
1572  {
1573 #ifdef ENABLE_DEBUG_ULOGS
1574  ulog("Rejecting connection from peer because their version is too old. Their version date: ${date}", ("date", originating_peer->graphene_git_revision_unix_timestamp));
1575 #endif
1576  wlog("Received hello message from peer running a version of that can only understand blocks up to #${their_hard_fork}, but I'm at head block number #${my_block_number}",
1577  ("their_hard_fork", next_fork_block_number)("my_block_number", head_block_num));
1578  std::ostringstream rejection_message;
1579  rejection_message << "Your client is outdated -- you can only understand blocks up to #" << next_fork_block_number << ", but I'm already on block #" << head_block_num;
1581  originating_peer->get_socket().remote_endpoint(),
1583  rejection_message.str() );
1584 
1586  originating_peer->send_message(message(connection_rejected));
1587  // for this type of message, we're immediately disconnecting this peer, instead of trying to
1588  // allowing her to ask us for peers (any of our peers will be on the same chain as us, so there's no
1589  // benefit of sharing them)
1590  disconnect_from_peer(originating_peer, "Your client is too old, please upgrade");
1591  return;
1592  }
1593  }
1594  }
1595  if (already_connected_to_this_peer)
1596  {
1597 
1598  connection_rejected_message connection_rejected;
1599  if (_node_id == originating_peer->node_id)
1601  originating_peer->get_socket().remote_endpoint(),
1603  "I'm connecting to myself");
1604  else
1606  originating_peer->get_socket().remote_endpoint(),
1608  "I'm already connected to you");
1610  originating_peer->send_message(message(connection_rejected));
1611  dlog("Received a hello_message from peer ${peer} that I'm already connected to (with id ${id}), rejection",
1612  ("peer", originating_peer->get_remote_endpoint())
1613  ("id", originating_peer->node_id));
1614  }
1615 #ifdef ENABLE_P2P_DEBUGGING_API
1616  else if(!_allowed_peers.empty() &&
1617  _allowed_peers.find(originating_peer->node_id) == _allowed_peers.end())
1618  {
1620  originating_peer->get_socket().remote_endpoint(),
1622  "you are not in my allowed_peers list");
1624  originating_peer->send_message( message(connection_rejected ) );
1625  dlog( "Received a hello_message from peer ${peer} who isn't in my allowed_peers list, rejection", ("peer", originating_peer->get_remote_endpoint() ) );
1626  }
1627 #endif // ENABLE_P2P_DEBUGGING_API
1628  else
1629  {
1630  // whether we're planning on accepting them as a peer or not, they seem to be a valid node,
1631  // so add them to our database if they're not firewalled
1632 
1633  // in the hello message, the peer sent us the IP address and port it thought it was connecting from.
1634  // If they match the IP and port we see, we assume that they're actually on the internet and they're not
1635  // firewalled.
1636  fc::ip::endpoint peers_actual_outbound_endpoint = originating_peer->get_socket().remote_endpoint();
1637  if( peers_actual_outbound_endpoint.get_address() == originating_peer->inbound_address &&
1638  peers_actual_outbound_endpoint.port() == originating_peer->outbound_port )
1639  {
1640  if( originating_peer->inbound_port == 0 )
1641  {
1642  dlog( "peer does not appear to be firewalled, but they did not give an inbound port so I'm treating them as if they are." );
1643  originating_peer->is_firewalled = firewalled_state::firewalled;
1644  }
1645  else
1646  {
1647  // peer is not firewalled, add it to our database
1648  fc::ip::endpoint peers_inbound_endpoint(originating_peer->inbound_address, originating_peer->inbound_port);
1649  potential_peer_record updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_endpoint(peers_inbound_endpoint);
1650  _potential_peer_db.update_entry(updated_peer_record);
1651  originating_peer->is_firewalled = firewalled_state::not_firewalled;
1652  }
1653  }
1654  else
1655  {
1656  dlog("peer is firewalled: they think their outbound endpoint is ${reported_endpoint}, but I see it as ${actual_endpoint}",
1657  ("reported_endpoint", fc::ip::endpoint(originating_peer->inbound_address, originating_peer->outbound_port))
1658  ("actual_endpoint", peers_actual_outbound_endpoint));
1659  originating_peer->is_firewalled = firewalled_state::firewalled;
1660  }
1661 
1663  {
1665  originating_peer->get_socket().remote_endpoint(),
1667  "not accepting any more incoming connections");
1669  originating_peer->send_message(message(connection_rejected));
1670  dlog("Received a hello_message from peer ${peer}, but I'm not accepting any more connections, rejection",
1671  ("peer", originating_peer->get_remote_endpoint()));
1672  }
1673  else
1674  {
1676  originating_peer->send_message(message(connection_accepted_message()));
1677  dlog("Received a hello_message from peer ${peer}, sending reply to accept connection",
1678  ("peer", originating_peer->get_remote_endpoint()));
1679  }
1680  }
1681  }
1682  else
1683  {
1684  // we can wind up here if we've connected to ourself, and the source and
1685  // destination endpoints are the same, causing messages we send out
1686  // to arrive back on the initiating socket instead of the receiving
1687  // socket. If we did a complete job of enumerating local addresses,
1688  // we could avoid directly connecting to ourselves, or at least detect
1689  // immediately when we did it and disconnect.
1690 
1691  // The only way I know of that we'd get an unexpected hello that we
1692  // can't really guard against is if we do a simulatenous open, we
1693  // probably need to think through that case. We're not attempting that
1694  // yet, though, so it's ok to just disconnect here.
1695  wlog("unexpected hello_message from peer, disconnecting");
1696  disconnect_from_peer(originating_peer, "Received a unexpected hello_message");
1697  }
1698  }
1699 
1700  void node_impl::on_connection_accepted_message(peer_connection* originating_peer, const connection_accepted_message& connection_accepted_message_received)
1701  {
1703  dlog("Received a connection_accepted in response to my \"hello\" from ${peer}", ("peer", originating_peer->get_remote_endpoint()));
1706  originating_peer->send_message(address_request_message());
1710  originating_peer->core_protocol_version >= 106)
1711  {
1712  wlog("I don't know if I'm firewalled. Sending a firewall check message to peer ${peer}",
1713  ("peer", originating_peer->get_remote_endpoint()));
1714  originating_peer->firewall_check_state = new firewall_check_state_data;
1715 
1716  originating_peer->send_message(check_firewall_message());
1718  }
1719  }
1720 
1721  void node_impl::on_connection_rejected_message(peer_connection* originating_peer, const connection_rejected_message& connection_rejected_message_received)
1722  {
1725  {
1726  ilog("Received a rejection from ${peer} in response to my \"hello\", reason: \"${reason}\"",
1727  ("peer", originating_peer->get_remote_endpoint())
1728  ("reason", connection_rejected_message_received.reason_string));
1729 
1730  if (connection_rejected_message_received.reason_code == rejection_reason_code::connected_to_self)
1731  {
1732  _potential_peer_db.erase(originating_peer->get_socket().remote_endpoint());
1733  move_peer_to_closing_list(originating_peer->shared_from_this());
1734  originating_peer->close_connection();
1735  }
1736  else
1737  {
1738  // update our database to record that we were rejected so we won't try to connect again for a while
1739  // this only happens on connections we originate, so we should already know that peer is not firewalled
1741  if (updated_peer_record)
1742  {
1743  updated_peer_record->last_connection_disposition = last_connection_rejected;
1744  updated_peer_record->last_connection_attempt_time = fc::time_point::now();
1745  _potential_peer_db.update_entry(*updated_peer_record);
1746  }
1747  }
1748 
1751  originating_peer->send_message(address_request_message());
1752  }
1753  else
1754  FC_THROW( "unexpected connection_rejected_message from peer" );
1755  }
1756 
1757  void node_impl::on_address_request_message(peer_connection* originating_peer, const address_request_message& address_request_message_received)
1758  {
1760  dlog("Received an address request message");
1761 
1762  address_message reply;
1764  {
1765  reply.addresses.reserve(_active_connections.size());
1767  for (const peer_connection_ptr& active_peer : _active_connections)
1768  {
1769  fc::optional<potential_peer_record> updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*active_peer->get_remote_endpoint());
1770  if (updated_peer_record)
1771  {
1772  updated_peer_record->last_seen_time = fc::time_point::now();
1773  _potential_peer_db.update_entry(*updated_peer_record);
1774  }
1775 
1776  reply.addresses.emplace_back(address_info(*active_peer->get_remote_endpoint(),
1778  active_peer->round_trip_delay,
1779  active_peer->node_id,
1780  active_peer->direction,
1781  active_peer->is_firewalled));
1782  }
1783  }
1784  originating_peer->send_message(reply);
1785  }
1786 
1787  void node_impl::on_address_message(peer_connection* originating_peer, const address_message& address_message_received)
1788  {
1790  dlog("Received an address message containing ${size} addresses", ("size", address_message_received.addresses.size()));
1791  for (const address_info& address : address_message_received.addresses)
1792  {
1793  dlog(" ${endpoint} last seen ${time}", ("endpoint", address.remote_endpoint)("time", address.last_seen_time));
1794  }
1795  std::vector<graphene::net::address_info> updated_addresses = address_message_received.addresses;
1796  for (address_info& address : updated_addresses)
1798  bool new_information_received = merge_address_info_with_potential_peer_database(updated_addresses);
1799  if (new_information_received)
1801 
1802  if (_handshaking_connections.find(originating_peer->shared_from_this()) != _handshaking_connections.end())
1803  {
1804  // if we were handshaking, we need to continue with the next step in handshaking (which is either
1805  // ending handshaking and starting synchronization or disconnecting)
1807  disconnect_from_peer(originating_peer, "You rejected my connection request (hello message) so I'm disconnecting");
1809  disconnect_from_peer(originating_peer, "I rejected your connection request (hello message) so I'm disconnecting");
1810  else
1811  {
1812  fc::optional<fc::ip::endpoint> inbound_endpoint = originating_peer->get_endpoint_for_connecting();
1813  if (inbound_endpoint)
1814  {
1815  // mark the connection as successful in the database
1816  fc::optional<potential_peer_record> updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*inbound_endpoint);
1817  if (updated_peer_record)
1818  {
1819  updated_peer_record->last_connection_disposition = last_connection_succeeded;
1820  _potential_peer_db.update_entry(*updated_peer_record);
1821  }
1822  }
1823 
1825  move_peer_to_active_list(originating_peer->shared_from_this());
1826  new_peer_just_added(originating_peer->shared_from_this());
1827  }
1828  }
1829  // else if this was an active connection, then this was just a reply to our periodic address requests.
1830  // we've processed it, there's nothing else to do
1831  }
1832 
1834  const fetch_blockchain_item_ids_message& fetch_blockchain_item_ids_message_received)
1835  {
1837  item_id peers_last_item_seen = item_id(fetch_blockchain_item_ids_message_received.item_type, item_hash_t());
1838  if (fetch_blockchain_item_ids_message_received.blockchain_synopsis.empty())
1839  {
1840  dlog("sync: received a request for item ids starting at the beginning of the chain from peer ${peer_endpoint} (full request: ${synopsis})",
1841  ("peer_endpoint", originating_peer->get_remote_endpoint())
1842  ("synopsis", fetch_blockchain_item_ids_message_received.blockchain_synopsis));
1843  }
1844  else
1845  {
1846  item_hash_t peers_last_item_hash_seen = fetch_blockchain_item_ids_message_received.blockchain_synopsis.back();
1847  dlog("sync: received a request for item ids after ${last_item_seen} from peer ${peer_endpoint} (full request: ${synopsis})",
1848  ("last_item_seen", peers_last_item_hash_seen)
1849  ("peer_endpoint", originating_peer->get_remote_endpoint())
1850  ("synopsis", fetch_blockchain_item_ids_message_received.blockchain_synopsis));
1851  peers_last_item_seen.item_hash = peers_last_item_hash_seen;
1852  }
1853 
1855  reply_message.item_type = fetch_blockchain_item_ids_message_received.item_type;
1856  reply_message.total_remaining_item_count = 0;
1857  try
1858  {
1859  reply_message.item_hashes_available = _delegate->get_block_ids(fetch_blockchain_item_ids_message_received.blockchain_synopsis,
1860  reply_message.total_remaining_item_count);
1861  }
1862  catch (const peer_is_on_an_unreachable_fork&)
1863  {
1864  dlog("Peer is on a fork and there's no set of blocks we can provide to switch them to our fork");
1865  // we reply with an empty list as if we had an empty blockchain;
1866  // we don't want to disconnect because they may be able to provide
1867  // us with blocks on their chain
1868  }
1869 
1870  bool disconnect_from_inhibited_peer = false;
1871  // if our client doesn't have any items after the item the peer requested, it will send back
1872  // a list containing the last item the peer requested
1873  idump((reply_message)(fetch_blockchain_item_ids_message_received.blockchain_synopsis));
1874  if( reply_message.item_hashes_available.empty() )
1875  originating_peer->peer_needs_sync_items_from_us = false; /* I have no items in my blockchain */
1876  else if( !fetch_blockchain_item_ids_message_received.blockchain_synopsis.empty() &&
1877  reply_message.item_hashes_available.size() == 1 &&
1878  std::find(fetch_blockchain_item_ids_message_received.blockchain_synopsis.begin(),
1879  fetch_blockchain_item_ids_message_received.blockchain_synopsis.end(),
1880  reply_message.item_hashes_available.back() ) != fetch_blockchain_item_ids_message_received.blockchain_synopsis.end() )
1881  {
1882  /* the last item in the peer's list matches the last item in our list */
1883  originating_peer->peer_needs_sync_items_from_us = false;
1884  if (originating_peer->inhibit_fetching_sync_blocks)
1885  disconnect_from_inhibited_peer = true; // delay disconnecting until after we send our reply to this fetch_blockchain_item_ids_message
1886  }
1887  else
1888  originating_peer->peer_needs_sync_items_from_us = true;
1889 
1890  if (!originating_peer->peer_needs_sync_items_from_us)
1891  {
1892  dlog("sync: peer is already in sync with us");
1893  // if we thought we had all the items this peer had, but now it turns out that we don't
1894  // have the last item it requested to send from,
1895  // we need to kick off another round of synchronization
1896  if (!originating_peer->we_need_sync_items_from_peer &&
1897  !fetch_blockchain_item_ids_message_received.blockchain_synopsis.empty() &&
1898  !_delegate->has_item(peers_last_item_seen))
1899  {
1900  dlog("sync: restarting sync with peer ${peer}", ("peer", originating_peer->get_remote_endpoint()));
1901  start_synchronizing_with_peer(originating_peer->shared_from_this());
1902  }
1903  }
1904  else
1905  {
1906  dlog("sync: peer is out of sync, sending peer ${count} items ids: first: ${first_item_id}, last: ${last_item_id}",
1907  ("count", reply_message.item_hashes_available.size())
1908  ("first_item_id", reply_message.item_hashes_available.front())
1909  ("last_item_id", reply_message.item_hashes_available.back()));
1910  if (!originating_peer->we_need_sync_items_from_peer &&
1911  !fetch_blockchain_item_ids_message_received.blockchain_synopsis.empty() &&
1912  !_delegate->has_item(peers_last_item_seen))
1913  {
1914  dlog("sync: restarting sync with peer ${peer}", ("peer", originating_peer->get_remote_endpoint()));
1915  start_synchronizing_with_peer(originating_peer->shared_from_this());
1916  }
1917  }
1918  originating_peer->send_message(reply_message);
1919 
1920  if (disconnect_from_inhibited_peer)
1921  {
1922  // the peer has all of our blocks, and we don't want any of theirs, so disconnect them
1923  disconnect_from_peer(originating_peer, "you are on a fork that I'm unable to switch to");
1924  return;
1925  }
1926 
1927  if (originating_peer->direction == peer_connection_direction::inbound &&
1928  _handshaking_connections.find(originating_peer->shared_from_this()) != _handshaking_connections.end())
1929  {
1930  // handshaking is done, move the connection to fully active status and start synchronizing
1931  dlog("peer ${endpoint} which was handshaking with us has started synchronizing with us, start syncing with it",
1932  ("endpoint", originating_peer->get_remote_endpoint()));
1933  fc::optional<fc::ip::endpoint> inbound_endpoint = originating_peer->get_endpoint_for_connecting();
1934  if (inbound_endpoint)
1935  {
1936  // mark the connection as successful in the database
1937  potential_peer_record updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_endpoint(*inbound_endpoint);
1939  _potential_peer_db.update_entry(updated_peer_record);
1940  }
1941 
1942  // transition it to our active list
1943  move_peer_to_active_list(originating_peer->shared_from_this());
1944  new_peer_just_added(originating_peer->shared_from_this());
1945  }
1946  }
1947 
1949  {
1951  uint32_t max_number_of_unfetched_items = 0;
1953  for( const peer_connection_ptr& peer : _active_connections )
1954  {
1955  uint32_t this_peer_number_of_unfetched_items = (uint32_t)peer->ids_of_items_to_get.size() + peer->number_of_unfetched_item_ids;
1956  max_number_of_unfetched_items = std::max(max_number_of_unfetched_items,
1957  this_peer_number_of_unfetched_items);
1958  }
1959  return max_number_of_unfetched_items;
1960  }
1961 
1962  // get a blockchain synopsis that makes sense to send to the given peer.
1963  // If the peer isn't yet syncing with us, this is just a synopsis of our active blockchain
1964  // If the peer is syncing with us, it is a synopsis of our active blockchain plus the
1965  // blocks the peer has already told us it has
1966  std::vector<item_hash_t> node_impl::create_blockchain_synopsis_for_peer( const peer_connection* peer )
1967  {
1969  item_hash_t reference_point = peer->last_block_delegate_has_seen;
1970 
1971  // when we call _delegate->get_blockchain_synopsis(), we may yield and there's a
1972  // chance this peer's state will change before we get control back. Save off
1973  // the stuff necessary for generating the synopsis.
1974  // This is pretty expensive, we should find a better way to do this
1975  std::vector<item_hash_t> original_ids_of_items_to_get(peer->ids_of_items_to_get.begin(), peer->ids_of_items_to_get.end());
1976  uint32_t number_of_blocks_after_reference_point = original_ids_of_items_to_get.size();
1977 
1978  std::vector<item_hash_t> synopsis = _delegate->get_blockchain_synopsis(reference_point, number_of_blocks_after_reference_point);
1979 
1980 #if 0
1981  // just for debugging, enable this and set a breakpoint to step through
1982  if (synopsis.empty())
1983  synopsis = _delegate->get_blockchain_synopsis(reference_point, number_of_blocks_after_reference_point);
1984 
1985  // TODO: it's possible that the returned synopsis is empty if the blockchain is empty (that's fine)
1986  // or if the reference point is now past our undo history (that's not).
1987  // in the second case, we should mark this peer as one we're unable to sync with and
1988  // disconnect them.
1989  if (reference_point != item_hash_t() && synopsis.empty())
1990  FC_THROW_EXCEPTION(block_older_than_undo_history, "You are on a fork I'm unable to switch to");
1991 #endif
1992 
1993  if( number_of_blocks_after_reference_point )
1994  {
1995  // then the synopsis is incomplete, add the missing elements from ids_of_items_to_get
1996  uint32_t first_block_num_in_ids_to_get = _delegate->get_block_number(original_ids_of_items_to_get.front());
1997  uint32_t true_high_block_num = first_block_num_in_ids_to_get + original_ids_of_items_to_get.size() - 1;
1998 
1999  // in order to generate a seamless synopsis, we need to be using the same low_block_num as the
2000  // backend code; the first block in the synopsis will be the low block number it used
2001  uint32_t low_block_num = synopsis.empty() ? 1 : _delegate->get_block_number(synopsis.front());
2002 
2003  do
2004  {
2005  if( low_block_num >= first_block_num_in_ids_to_get )
2006  synopsis.push_back(original_ids_of_items_to_get[low_block_num - first_block_num_in_ids_to_get]);
2007  low_block_num += (true_high_block_num - low_block_num + 2 ) / 2;
2008  }
2009  while ( low_block_num <= true_high_block_num );
2010  assert(synopsis.back() == original_ids_of_items_to_get.back());
2011  }
2012  return synopsis;
2013  }
2014 
2015  void node_impl::fetch_next_batch_of_item_ids_from_peer( peer_connection* peer, bool reset_fork_tracking_data_for_peer /* = false */ )
2016  {
2018  if( reset_fork_tracking_data_for_peer )
2019  {
2021  peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hash_t());
2022  }
2023 
2024  fc::oexception synopsis_exception;
2025  try
2026  {
2027  std::vector<item_hash_t> blockchain_synopsis = create_blockchain_synopsis_for_peer( peer );
2028 
2029  item_hash_t last_item_seen = blockchain_synopsis.empty() ? item_hash_t() : blockchain_synopsis.back();
2030  dlog( "sync: sending a request for the next items after ${last_item_seen} to peer ${peer}, (full request is ${blockchain_synopsis})",
2031  ( "last_item_seen", last_item_seen )
2032  ( "peer", peer->get_remote_endpoint() )
2033  ( "blockchain_synopsis", blockchain_synopsis ) );
2034  peer->item_ids_requested_from_peer = boost::make_tuple( blockchain_synopsis, fc::time_point::now() );
2035  peer->send_message( fetch_blockchain_item_ids_message(_sync_item_type, blockchain_synopsis ) );
2036  }
2037  catch (const block_older_than_undo_history& e)
2038  {
2039  synopsis_exception = e;
2040  }
2041  if (synopsis_exception)
2042  disconnect_from_peer(peer, "You are on a fork I'm unable to switch to");
2043  }
2044 
2046  const blockchain_item_ids_inventory_message& blockchain_item_ids_inventory_message_received )
2047  {
2049  // ignore unless we asked for the data
2050  if( originating_peer->item_ids_requested_from_peer )
2051  {
2052  // verify that the peer's the block ids the peer sent is a valid response to our request;
2053  // It should either be an empty list of blocks, or a list of blocks that builds off of one of
2054  // the blocks in the synopsis we sent
2055  if (!blockchain_item_ids_inventory_message_received.item_hashes_available.empty())
2056  {
2057  // what's more, it should be a sequential list of blocks, verify that first
2058  uint32_t first_block_number_in_reponse = _delegate->get_block_number(blockchain_item_ids_inventory_message_received.item_hashes_available.front());
2059  for (unsigned i = 1; i < blockchain_item_ids_inventory_message_received.item_hashes_available.size(); ++i)
2060  {
2061  uint32_t actual_num = _delegate->get_block_number(blockchain_item_ids_inventory_message_received.item_hashes_available[i]);
2062  uint32_t expected_num = first_block_number_in_reponse + i;
2063  if (actual_num != expected_num)
2064  {
2065  wlog("Invalid response from peer ${peer_endpoint}. The list of blocks they provided is not sequential, "
2066  "the ${position}th block in their reply was block number ${actual_num}, "
2067  "but it should have been number ${expected_num}",
2068  ("peer_endpoint", originating_peer->get_remote_endpoint())
2069  ("position", i)
2070  ("actual_num", actual_num)
2071  ("expected_num", expected_num));
2072  fc::exception error_for_peer(FC_LOG_MESSAGE(error,
2073  "You gave an invalid response to my request for sync blocks. The list of blocks you provided is not sequential, "
2074  "the ${position}th block in their reply was block number ${actual_num}, "
2075  "but it should have been number ${expected_num}",
2076  ("position", i)
2077  ("actual_num", actual_num)
2078  ("expected_num", expected_num)));
2079  disconnect_from_peer(originating_peer,
2080  "You gave an invalid response to my request for sync blocks",
2081  true, error_for_peer);
2082  return;
2083  }
2084  }
2085 
2086  const std::vector<item_hash_t>& synopsis_sent_in_request = originating_peer->item_ids_requested_from_peer->get<0>();
2087  const item_hash_t& first_item_hash = blockchain_item_ids_inventory_message_received.item_hashes_available.front();
2088 
2089  if (synopsis_sent_in_request.empty())
2090  {
2091  // if we sent an empty synopsis, we were asking for all blocks, so the first block should be block 1
2092  if (_delegate->get_block_number(first_item_hash) != 1)
2093  {
2094  wlog("Invalid response from peer ${peer_endpoint}. We requested a list of sync blocks starting from the beginning of the chain, "
2095  "but they provided a list of blocks starting with ${first_block}",
2096  ("peer_endpoint", originating_peer->get_remote_endpoint())
2097  ("first_block", first_item_hash));
2098  fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You gave an invalid response for my request for sync blocks. I asked for blocks starting from the beginning of the chain, "
2099  "but you returned a list of blocks starting with ${first_block}",
2100  ("first_block", first_item_hash)));
2101  disconnect_from_peer(originating_peer,
2102  "You gave an invalid response to my request for sync blocks",
2103  true, error_for_peer);
2104  return;
2105  }
2106  }
2107  else // synopsis was not empty, we expect a response building off one of the blocks we sent
2108  {
2109  if (boost::range::find(synopsis_sent_in_request, first_item_hash) == synopsis_sent_in_request.end())
2110  {
2111  wlog("Invalid response from peer ${peer_endpoint}. We requested a list of sync blocks based on the synopsis ${synopsis}, but they "
2112  "provided a list of blocks starting with ${first_block}",
2113  ("peer_endpoint", originating_peer->get_remote_endpoint())
2114  ("synopsis", synopsis_sent_in_request)
2115  ("first_block", first_item_hash));
2116  fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You gave an invalid response for my request for sync blocks. I asked for blocks following something in "
2117  "${synopsis}, but you returned a list of blocks starting with ${first_block} which wasn't one of your choices",
2118  ("synopsis", synopsis_sent_in_request)
2119  ("first_block", first_item_hash)));
2120  disconnect_from_peer(originating_peer,
2121  "You gave an invalid response to my request for sync blocks",
2122  true, error_for_peer);
2123  return;
2124  }
2125  }
2126  }
2127  originating_peer->item_ids_requested_from_peer.reset();
2128 
2129  // if exceptions are throw after clearing the item_ids_requested_from_peer (above),
2130  // it could leave our sync in a stalled state. Wrap a try/catch around the rest
2131  // of the function so we can log if this ever happens.
2132  try
2133  {
2134  dlog( "sync: received a list of ${count} available items from ${peer_endpoint}",
2135  ( "count", blockchain_item_ids_inventory_message_received.item_hashes_available.size() )
2136  ( "peer_endpoint", originating_peer->get_remote_endpoint() ) );
2137  //for( const item_hash_t& item_hash : blockchain_item_ids_inventory_message_received.item_hashes_available )
2138  //{
2139  // dlog( "sync: ${hash}", ("hash", item_hash ) );
2140  //}
2141 
2142  // if the peer doesn't have any items after the one we asked for
2143  if( blockchain_item_ids_inventory_message_received.total_remaining_item_count == 0 &&
2144  ( blockchain_item_ids_inventory_message_received.item_hashes_available.empty() || // there are no items in the peer's blockchain. this should only happen if our blockchain was empty when we requested, might want to verify that.
2145  ( blockchain_item_ids_inventory_message_received.item_hashes_available.size() == 1 &&
2146  _delegate->has_item( item_id(blockchain_item_ids_inventory_message_received.item_type,
2147  blockchain_item_ids_inventory_message_received.item_hashes_available.front() ) ) ) ) && // we've already seen the last item in the peer's blockchain
2148  originating_peer->ids_of_items_to_get.empty() &&
2149  originating_peer->number_of_unfetched_item_ids == 0 ) // <-- is the last check necessary?
2150  {
2151  dlog( "sync: peer said we're up-to-date, entering normal operation with this peer" );
2152  originating_peer->we_need_sync_items_from_peer = false;
2153 
2154  uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers();
2155  _total_number_of_unfetched_items = new_number_of_unfetched_items;
2156  if( new_number_of_unfetched_items == 0 )
2157  _delegate->sync_status( blockchain_item_ids_inventory_message_received.item_type, 0 );
2158 
2159  return;
2160  }
2161 
2162  std::deque<item_hash_t> item_hashes_received( blockchain_item_ids_inventory_message_received.item_hashes_available.begin(),
2163  blockchain_item_ids_inventory_message_received.item_hashes_available.end() );
2164  originating_peer->number_of_unfetched_item_ids = blockchain_item_ids_inventory_message_received.total_remaining_item_count;
2165  // flush any items this peer sent us that we've already received and processed from another peer
2166  if (!item_hashes_received.empty() &&
2167  originating_peer->ids_of_items_to_get.empty())
2168  {
2169  bool is_first_item_for_other_peer = false;
2170  {
2172  for (const peer_connection_ptr& peer : _active_connections)
2173  {
2174  if (peer != originating_peer->shared_from_this() &&
2175  !peer->ids_of_items_to_get.empty() &&
2176  peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front())
2177  {
2178  dlog("The item ${newitem} is the first item for peer ${peer}",
2179  ("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front())
2180  ("peer", peer->get_remote_endpoint()));
2181  is_first_item_for_other_peer = true;
2182  break;
2183  }
2184  }
2185  }
2186  dlog("is_first_item_for_other_peer: ${is_first}. item_hashes_received.size() = ${size}",
2187  ("is_first", is_first_item_for_other_peer)("size", item_hashes_received.size()));
2188  if (!is_first_item_for_other_peer)
2189  {
2190  while (!item_hashes_received.empty() &&
2191  _delegate->has_item(item_id(blockchain_item_ids_inventory_message_received.item_type,
2192  item_hashes_received.front())))
2193  {
2194  assert(item_hashes_received.front() != item_hash_t());
2195  originating_peer->last_block_delegate_has_seen = item_hashes_received.front();
2196  originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hashes_received.front());
2197  dlog("popping item because delegate has already seen it. peer ${peer}'s last block the delegate has seen is now ${block_id} (actual block #${actual_block_num})",
2198  ("peer", originating_peer->get_remote_endpoint())
2199  ("block_id", originating_peer->last_block_delegate_has_seen)
2200  ("actual_block_num", _delegate->get_block_number(item_hashes_received.front())));
2201 
2202  item_hashes_received.pop_front();
2203  }
2204  dlog("after removing all items we have already seen, item_hashes_received.size() = ${size}", ("size", item_hashes_received.size()));
2205  }
2206  }
2207  else if (!item_hashes_received.empty())
2208  {
2209  // we received a list of items and we already have a list of items to fetch from this peer.
2210  // In the normal case, this list will immediately follow the existing list, meaning the
2211  // last hash of our existing list will match the first hash of the new list.
2212 
2213  // In the much less likely case, we've received a partial list of items from the peer, then
2214  // the peer switched forks before sending us the remaining list. In this case, the first
2215  // hash in the new list may not be the last hash in the existing list (it may be earlier, or
2216  // it may not exist at all.
2217 
2218  // In either case, pop items off the back of our existing list until we find our first
2219  // item, then append our list.
2220  while (!originating_peer->ids_of_items_to_get.empty())
2221  {
2222  if (item_hashes_received.front() != originating_peer->ids_of_items_to_get.back())
2223  originating_peer->ids_of_items_to_get.pop_back();
2224  else
2225  break;
2226  }
2227  if (originating_peer->ids_of_items_to_get.empty())
2228  {
2229  // this happens when the peer has switched forks between the last inventory message and
2230  // this one, and there weren't any unfetched items in common
2231  // We don't know where in the blockchain the new front() actually falls, all we can
2232  // expect is that it is a block that we knew about because it should be one of the
2233  // blocks we sent in the initial synopsis.
2234  assert(_delegate->has_item(item_id(_sync_item_type, item_hashes_received.front())));
2235  originating_peer->last_block_delegate_has_seen = item_hashes_received.front();
2236  originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hashes_received.front());
2237  item_hashes_received.pop_front();
2238  }
2239  else
2240  {
2241  // the common simple case: the new list extends the old. pop off the duplicate element
2242  originating_peer->ids_of_items_to_get.pop_back();
2243  }
2244  }
2245 
2246  if (!item_hashes_received.empty() && !originating_peer->ids_of_items_to_get.empty())
2247  assert(item_hashes_received.front() != originating_peer->ids_of_items_to_get.back());
2248 
2249  // at any given time, there's a maximum number of blocks that can possibly be out there
2250  // [(now - genesis time) / block interval]. If they offer us more blocks than that,
2251  // they must be an attacker or have a buggy client.
2252  fc::time_point_sec minimum_time_of_last_offered_block =
2253  originating_peer->last_block_time_delegate_has_seen + // timestamp of the block immediately before the first unfetched block
2256  if (minimum_time_of_last_offered_block > now + GRAPHENE_NET_FUTURE_SYNC_BLOCKS_GRACE_PERIOD_SEC)
2257  {
2258  wlog("Disconnecting from peer ${peer} who offered us an implausible number of blocks, their last block would be in the future (${timestamp})",
2259  ("peer", originating_peer->get_remote_endpoint())
2260  ("timestamp", minimum_time_of_last_offered_block));
2261  fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You offered me a list of more sync blocks than could possibly exist. Total blocks offered: ${blocks}, Minimum time of the last block you offered: ${minimum_time_of_last_offered_block}, Now: ${now}",
2262  ("blocks", originating_peer->number_of_unfetched_item_ids)
2263  ("minimum_time_of_last_offered_block", minimum_time_of_last_offered_block)
2264  ("now", now)));
2265  disconnect_from_peer(originating_peer,
2266  "You offered me a list of more sync blocks than could possibly exist",
2267  true, error_for_peer);
2268  return;
2269  }
2270 
2271  // append the remaining items to the peer's list
2272  boost::push_back(originating_peer->ids_of_items_to_get, item_hashes_received);
2273 
2274  uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers();
2275  if (new_number_of_unfetched_items != _total_number_of_unfetched_items)
2276  _delegate->sync_status(blockchain_item_ids_inventory_message_received.item_type,
2277  new_number_of_unfetched_items);
2278  _total_number_of_unfetched_items = new_number_of_unfetched_items;
2279 
2280  if (blockchain_item_ids_inventory_message_received.total_remaining_item_count != 0)
2281  {
2282  // the peer hasn't sent us all the items it knows about.
2283  if (originating_peer->ids_of_items_to_get.size() > GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH)
2284  {
2285  // we have a good number of item ids from this peer, start fetching blocks from it;
2286  // we'll switch back later to finish the job.
2288  }
2289  else
2290  {
2291  // keep fetching the peer's list of sync items until we get enough to switch into block-
2292  // fetchimg mode
2293  fetch_next_batch_of_item_ids_from_peer(originating_peer);
2294  }
2295  }
2296  else
2297  {
2298  // the peer has told us about all of the items it knows
2299  if (!originating_peer->ids_of_items_to_get.empty())
2300  {
2301  // we now know about all of the items the peer knows about, and there are some items on the list
2302  // that we should try to fetch. Kick off the fetch loop.
2304  }
2305  else
2306  {
2307  // If we get here, the peer has sent us a non-empty list of items, but we have already
2308  // received all of the items from other peers. Send a new request to the peer to
2309  // see if we're really in sync
2310  fetch_next_batch_of_item_ids_from_peer(originating_peer);
2311  }
2312  }
2313  }
2314  catch (const fc::canceled_exception&)
2315  {
2316  throw;
2317  }
2318  catch (const fc::exception& e)
2319  {
2320  elog("Caught unexpected exception: ${e}", ("e", e));
2321  assert(false && "exceptions not expected here");
2322  }
2323  catch (const std::exception& e)
2324  {
2325  elog("Caught unexpected exception: ${e}", ("e", e.what()));
2326  assert(false && "exceptions not expected here");
2327  }
2328  catch (...)
2329  {
2330  elog("Caught unexpected exception, could break sync operation");
2331  }
2332  }
2333  else
2334  {
2335  wlog("sync: received a list of sync items available, but I didn't ask for any!");
2336  }
2337  }
2338 
2340  {
2341  try
2342  {
2343  return _message_cache.get_message(item.item_hash);
2344  }
2345  catch (fc::key_not_found_exception&)
2346  {}
2347  try
2348  {
2349  return _delegate->get_item(item);
2350  }
2351  catch (fc::key_not_found_exception&)
2352  {}
2353  return item_not_available_message(item);
2354  }
2355 
2356  void node_impl::on_fetch_items_message(peer_connection* originating_peer, const fetch_items_message& fetch_items_message_received)
2357  {
2359  dlog("received items request for ids ${ids} of type ${type} from peer ${endpoint}",
2360  ("ids", fetch_items_message_received.items_to_fetch)
2361  ("type", fetch_items_message_received.item_type)
2362  ("endpoint", originating_peer->get_remote_endpoint()));
2363 
2364  fc::optional<message> last_block_message_sent;
2365 
2366  std::list<message> reply_messages;
2367  for (const item_hash_t& item_hash : fetch_items_message_received.items_to_fetch)
2368  {
2369  try
2370  {
2371  message requested_message = _message_cache.get_message(item_hash);
2372  dlog("received item request for item ${id} from peer ${endpoint}, returning the item from my message cache",
2373  ("endpoint", originating_peer->get_remote_endpoint())
2374  ("id", requested_message.id()));
2375  reply_messages.push_back(requested_message);
2376  if (fetch_items_message_received.item_type == block_message_type)
2377  last_block_message_sent = requested_message;
2378  continue;
2379  }
2380  catch (fc::key_not_found_exception&)
2381  {
2382  // it wasn't in our local cache, that's ok ask the client
2383  }
2384 
2385  item_id item_to_fetch(fetch_items_message_received.item_type, item_hash);
2386  try
2387  {
2388  message requested_message = _delegate->get_item(item_to_fetch);
2389  dlog("received item request from peer ${endpoint}, returning the item from delegate with id ${id} size ${size}",
2390  ("id", requested_message.id())
2391  ("size", requested_message.size)
2392  ("endpoint", originating_peer->get_remote_endpoint()));
2393  reply_messages.push_back(requested_message);
2394  if (fetch_items_message_received.item_type == block_message_type)
2395  last_block_message_sent = requested_message;
2396  continue;
2397  }
2398  catch (fc::key_not_found_exception&)
2399  {
2400  reply_messages.push_back(item_not_available_message(item_to_fetch));
2401  dlog("received item request from peer ${endpoint} but we don't have it",
2402  ("endpoint", originating_peer->get_remote_endpoint()));
2403  }
2404  }
2405 
2406  // if we sent them a block, update our record of the last block they've seen accordingly
2407  if (last_block_message_sent)
2408  {
2409  graphene::net::block_message block = last_block_message_sent->as<graphene::net::block_message>();
2410  originating_peer->last_block_delegate_has_seen = block.block_id;
2411  originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(block.block_id);
2412  }
2413 
2414  for (const message& reply : reply_messages)
2415  {
2416  if (reply.msg_type.value() == block_message_type)
2418  else
2419  originating_peer->send_message(reply);
2420  }
2421  }
2422 
2423  void node_impl::on_item_not_available_message( peer_connection* originating_peer, const item_not_available_message& item_not_available_message_received )
2424  {
2426  const item_id& requested_item = item_not_available_message_received.requested_item;
2427  auto regular_item_iter = originating_peer->items_requested_from_peer.find(requested_item);
2428  if (regular_item_iter != originating_peer->items_requested_from_peer.end())
2429  {
2430  originating_peer->items_requested_from_peer.erase( regular_item_iter );
2431  originating_peer->inventory_peer_advertised_to_us.erase( requested_item );
2432  if (is_item_in_any_peers_inventory(requested_item))
2434  wlog("Peer doesn't have the requested item.");
2436  return;
2437  }
2438 
2439  auto sync_item_iter = originating_peer->sync_items_requested_from_peer.find(requested_item.item_hash);
2440  if (sync_item_iter != originating_peer->sync_items_requested_from_peer.end())
2441  {
2442  originating_peer->sync_items_requested_from_peer.erase(sync_item_iter);
2443 
2444  if (originating_peer->peer_needs_sync_items_from_us)
2445  originating_peer->inhibit_fetching_sync_blocks = true;
2446  else
2447  disconnect_from_peer(originating_peer, "You are missing a sync item you claim to have, your database is probably corrupted. Try --rebuild-index.",true,
2448  fc::exception(FC_LOG_MESSAGE(error,"You are missing a sync item you claim to have, your database is probably corrupted. Try --rebuild-index.",
2449  ("item_id", requested_item))));
2450  wlog("Peer doesn't have the requested sync item. This really shouldn't happen");
2452  return;
2453  }
2454 
2455  dlog("Peer doesn't have an item we're looking for, which is fine because we weren't looking for it");
2456  }
2457 
2458  void node_impl::on_item_ids_inventory_message(peer_connection* originating_peer, const item_ids_inventory_message& item_ids_inventory_message_received)
2459  {
2461 
2462  // expire old inventory so we'll be making decisions our about whether to fetch blocks below based only on recent inventory
2463  originating_peer->clear_old_inventory();
2464 
2465  dlog( "received inventory of ${count} items from peer ${endpoint}",
2466  ( "count", item_ids_inventory_message_received.item_hashes_available.size() )("endpoint", originating_peer->get_remote_endpoint() ) );
2467  for( const item_hash_t& item_hash : item_ids_inventory_message_received.item_hashes_available )
2468  {
2469  item_id advertised_item_id(item_ids_inventory_message_received.item_type, item_hash);
2470  bool we_advertised_this_item_to_a_peer = false;
2471  bool we_requested_this_item_from_a_peer = false;
2472  {
2474  for (const peer_connection_ptr peer : _active_connections)
2475  {
2476  if (peer->inventory_advertised_to_peer.find(advertised_item_id) != peer->inventory_advertised_to_peer.end())
2477  {
2478  we_advertised_this_item_to_a_peer = true;
2479  break;
2480  }
2481  if (peer->items_requested_from_peer.find(advertised_item_id) != peer->items_requested_from_peer.end())
2482  we_requested_this_item_from_a_peer = true;
2483  }
2484  }
2485 
2486  // if we have already advertised it to a peer, we must have it, no need to do anything else
2487  if (!we_advertised_this_item_to_a_peer)
2488  {
2489  // if the peer has flooded us with transactions, don't add these to the inventory to prevent our
2490  // inventory list from growing without bound. We try to allow fetching blocks even when
2491  // we've stopped fetching transactions.
2492  if ((item_ids_inventory_message_received.item_type == graphene::net::trx_message_type &&
2494  originating_peer->is_inventory_advertised_to_us_list_full())
2495  break;
2496  originating_peer->inventory_peer_advertised_to_us.insert(peer_connection::timestamped_item_id(advertised_item_id, fc::time_point::now()));
2497  if (!we_requested_this_item_from_a_peer)
2498  {
2499  if (_recently_failed_items.find(item_id(item_ids_inventory_message_received.item_type, item_hash)) != _recently_failed_items.end())
2500  {
2501  dlog("not adding ${item_hash} to our list of items to fetch because we've recently fetched a copy and it failed to push",
2502  ("item_hash", item_hash));
2503  }
2504  else
2505  {
2506  auto items_to_fetch_iter = _items_to_fetch.get<item_id_index>().find(advertised_item_id);
2507  if (items_to_fetch_iter == _items_to_fetch.get<item_id_index>().end())
2508  {
2509  // it's new to us
2511  dlog("adding item ${item_hash} from inventory message to our list of items to fetch",
2512  ("item_hash", item_hash));
2514  }
2515  else
2516  {
2517  // another peer has told us about this item already, but this peer just told us it has the item
2518  // too, we can expect it to be around in this peer's cache for longer, so update its timestamp
2519  _items_to_fetch.get<item_id_index>().modify(items_to_fetch_iter,
2520  [](prioritized_item_id& item) { item.timestamp = fc::time_point::now(); });
2521  }
2522  }
2523  }
2524  }
2525  }
2526 
2527  }
2528 
2530  const closing_connection_message& closing_connection_message_received )
2531  {
2533  originating_peer->they_have_requested_close = true;
2534 
2535  if( closing_connection_message_received.closing_due_to_error )
2536  {
2537  wlog( "Peer ${peer} is disconnecting us because of an error: ${msg}, exception: ${error}",
2538  ( "peer", originating_peer->get_remote_endpoint() )
2539  ( "msg", closing_connection_message_received.reason_for_closing )
2540  ( "error", closing_connection_message_received.error ) );
2541  std::ostringstream message;
2542  message << "Peer " << fc::variant( originating_peer->get_remote_endpoint(),
2543  GRAPHENE_NET_MAX_NESTED_OBJECTS ).as_string() <<
2544  " disconnected us: " << closing_connection_message_received.reason_for_closing;
2545  fc::exception detailed_error(FC_LOG_MESSAGE(warn,
2546  "Peer ${peer} is disconnecting us because of an error: ${msg}, exception: ${error}",
2547  ( "peer", originating_peer->get_remote_endpoint() )
2548  ( "msg", closing_connection_message_received.reason_for_closing )
2549  ( "error", closing_connection_message_received.error ) ));
2550  _delegate->error_encountered( message.str(),
2551  detailed_error );
2552  }
2553  else
2554  {
2555  wlog( "Peer ${peer} is disconnecting us because: ${msg}",
2556  ( "peer", originating_peer->get_remote_endpoint() )
2557  ( "msg", closing_connection_message_received.reason_for_closing ) );
2558  }
2559  if( originating_peer->we_have_requested_close )
2560  originating_peer->close_connection();
2561  }
2562 
2564  {
2566  peer_connection_ptr originating_peer_ptr = originating_peer->shared_from_this();
2567  _rate_limiter.remove_tcp_socket( &originating_peer->get_socket() );
2568 
2569  // if we closed the connection (due to timeout or handshake failure), we should have recorded an
2570  // error message to store in the peer database when we closed the connection
2571  fc::optional<fc::ip::endpoint> inbound_endpoint = originating_peer->get_endpoint_for_connecting();
2572  if (originating_peer->connection_closed_error && inbound_endpoint)
2573  {
2574  fc::optional<potential_peer_record> updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*inbound_endpoint);
2575  if (updated_peer_record)
2576  {
2577  updated_peer_record->last_error = *originating_peer->connection_closed_error;
2578  _potential_peer_db.update_entry(*updated_peer_record);
2579  }
2580  }
2581 
2582  _closing_connections.erase(originating_peer_ptr);
2583  _handshaking_connections.erase(originating_peer_ptr);
2584  _terminating_connections.erase(originating_peer_ptr);
2585  if (_active_connections.find(originating_peer_ptr) != _active_connections.end())
2586  {
2587  _active_connections.erase(originating_peer_ptr);
2588 
2589  if (inbound_endpoint && originating_peer_ptr->get_remote_endpoint())
2590  {
2591  fc::optional<potential_peer_record> updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*inbound_endpoint);
2592  if (updated_peer_record)
2593  {
2594  updated_peer_record->last_seen_time = fc::time_point::now();
2595  _potential_peer_db.update_entry(*updated_peer_record);
2596  }
2597  }
2598  }
2599 
2600  ilog("Remote peer ${endpoint} closed their connection to us", ("endpoint", originating_peer->get_remote_endpoint()));
2603 
2604  // notify the node delegate so it can update the display
2606  {
2608  _delegate->connection_count_changed( _last_reported_number_of_connections );
2609  }
2610 
2611  // if we had delegated a firewall check to this peer, send it to another peer
2612  if (originating_peer->firewall_check_state)
2613  {
2614  if (originating_peer->firewall_check_state->requesting_peer != node_id_t())
2615  {
2616  // it's a check we're doing for another node
2617  firewall_check_state_data* firewall_check_state = originating_peer->firewall_check_state;
2618  originating_peer->firewall_check_state = nullptr;
2619  forward_firewall_check_to_next_available_peer(firewall_check_state);
2620  }
2621  else
2622  {
2623  // we were asking them to check whether we're firewalled. we'll just let it
2624  // go for now
2625  delete originating_peer->firewall_check_state;
2626  }
2627  }
2628 
2629  // if we had requested any sync or regular items from this peer that we haven't
2630  // received yet, reschedule them to be fetched from another peer
2631  if (!originating_peer->sync_items_requested_from_peer.empty())
2632  {
2633  for (auto sync_item : originating_peer->sync_items_requested_from_peer)
2634  _active_sync_requests.erase(sync_item);
2636  }
2637 
2638  if (!originating_peer->items_requested_from_peer.empty())
2639  {
2640  for (auto item_and_time : originating_peer->items_requested_from_peer)
2641  {
2642  if (is_item_in_any_peers_inventory(item_and_time.first))
2644  }
2646  }
2647 
2648  schedule_peer_for_deletion(originating_peer_ptr);
2649  }
2650 
2652  {
2653  dlog("in send_sync_block_to_node_delegate()");
2654  bool client_accepted_block = false;
2655  bool discontinue_fetching_blocks_from_peer = false;
2656 
2657  fc::oexception handle_message_exception;
2658 
2659  try
2660  {
2661  std::vector<fc::uint160_t> contained_transaction_message_ids;
2662  _delegate->handle_block(block_message_to_send, true, contained_transaction_message_ids);
2663  ilog("Successfully pushed sync block ${num} (id:${id})",
2664  ("num", block_message_to_send.block.block_num())
2665  ("id", block_message_to_send.block_id));
2666  _most_recent_blocks_accepted.push_back(block_message_to_send.block_id);
2667 
2668  client_accepted_block = true;
2669  }
2670  catch (const block_older_than_undo_history& e)
2671  {
2672  wlog("Failed to push sync block ${num} (id:${id}): block is on a fork older than our undo history would "
2673  "allow us to switch to: ${e}",
2674  ("num", block_message_to_send.block.block_num())
2675  ("id", block_message_to_send.block_id)
2676  ("e", (fc::exception)e));
2677  handle_message_exception = e;
2678  discontinue_fetching_blocks_from_peer = true;
2679  }
2680  catch (const fc::canceled_exception&)
2681  {
2682  throw;
2683  }
2684  catch (const fc::exception& e)
2685  {
2686  auto block_num = block_message_to_send.block.block_num();
2687  wlog("Failed to push sync block ${num} (id:${id}): client rejected sync block sent by peer: ${e}",
2688  ("num", block_num)
2689  ("id", block_message_to_send.block_id)
2690  ("e", e));
2691  if( e.code() == block_timestamp_in_future_exception::code_enum::code_value )
2692  {
2693  handle_message_exception = block_timestamp_in_future_exception( FC_LOG_MESSAGE( warn, "",
2694  ("block_header", static_cast<graphene::protocol::block_header>(block_message_to_send.block))
2695  ("block_num", block_num)
2696  ("block_id", block_message_to_send.block_id) ) );
2697  }
2698  else
2699  handle_message_exception = e;
2700  }
2701 
2702  // build up lists for any potentially-blocking operations we need to do, then do them
2703  // at the end of this function
2704  std::set<peer_connection_ptr> peers_with_newly_empty_item_lists;
2705  std::set<peer_connection_ptr> peers_we_need_to_sync_to;
2706  std::map<peer_connection_ptr, std::pair<std::string, fc::oexception> > peers_to_disconnect; // map peer -> pair<reason_string, exception>
2707 
2708  if( client_accepted_block )
2709  {
2711  dlog("sync: client accpted the block, we now have only ${count} items left to fetch before we're in sync",
2713  bool is_fork_block = is_hard_fork_block(block_message_to_send.block.block_num());
2714  {
2716 
2717  for (const peer_connection_ptr& peer : _active_connections)
2718  {
2719  bool disconnecting_this_peer = false;
2720  if (is_fork_block)
2721  {
2722  // we just pushed a hard fork block. Find out if this peer is running a client
2723  // that will be unable to process future blocks
2724  if (peer->last_known_fork_block_number != 0)
2725  {
2726  uint32_t next_fork_block_number = get_next_known_hard_fork_block_number(peer->last_known_fork_block_number);
2727  if (next_fork_block_number != 0 &&
2728  next_fork_block_number <= block_message_to_send.block.block_num())
2729  {
2730  std::ostringstream disconnect_reason_stream;
2731  disconnect_reason_stream << "You need to upgrade your client due to hard fork at block " << block_message_to_send.block.block_num();
2732  peers_to_disconnect[peer] = std::make_pair(disconnect_reason_stream.str(),
2733  fc::oexception(fc::exception(FC_LOG_MESSAGE(error, "You need to upgrade your client due to hard fork at block ${block_number}",
2734  ("block_number", block_message_to_send.block.block_num())))));
2735 #ifdef ENABLE_DEBUG_ULOGS
2736  ulog("Disconnecting from peer during sync because their version is too old. Their version date: ${date}", ("date", peer->graphene_git_revision_unix_timestamp));
2737 #endif
2738  disconnecting_this_peer = true;
2739  }
2740  }
2741  }
2742  if (!disconnecting_this_peer &&
2743  peer->ids_of_items_to_get.empty() && peer->ids_of_items_being_processed.empty())
2744  {
2745  dlog( "Cannot pop first element off peer ${peer}'s list, its list is empty", ("peer", peer->get_remote_endpoint() ) );
2746  // we don't know for sure that this peer has the item we just received.
2747  // If peer is still syncing to us, we know they will ask us for
2748  // sync item ids at least one more time and we'll notify them about
2749  // the item then, so there's no need to do anything. If we still need items
2750  // from them, we'll be asking them for more items at some point, and
2751  // that will clue them in that they are out of sync. If we're fully in sync
2752  // we need to kick off another round of synchronization with them so they can
2753  // find out about the new item.
2754  if (!peer->peer_needs_sync_items_from_us && !peer->we_need_sync_items_from_peer)
2755  {
2756  dlog("We will be restarting synchronization with peer ${peer}", ("peer", peer->get_remote_endpoint()));
2757  peers_we_need_to_sync_to.insert(peer);
2758  }
2759  }
2760  else if (!disconnecting_this_peer)
2761  {
2762  auto items_being_processed_iter = peer->ids_of_items_being_processed.find(block_message_to_send.block_id);
2763  if (items_being_processed_iter != peer->ids_of_items_being_processed.end())
2764  {
2765  peer->last_block_delegate_has_seen = block_message_to_send.block_id;
2766  peer->last_block_time_delegate_has_seen = block_message_to_send.block.timestamp;
2767 
2768  peer->ids_of_items_being_processed.erase(items_being_processed_iter);
2769  dlog("Removed item from ${endpoint}'s list of items being processed, still processing ${len} blocks",
2770  ("endpoint", peer->get_remote_endpoint())("len", peer->ids_of_items_being_processed.size()));
2771 
2772  // if we just received the last item in our list from this peer, we will want to
2773  // send another request to find out if we are in sync, but we can't do this yet
2774  // (we don't want to allow a fiber swap in the middle of popping items off the list)
2775  if (peer->ids_of_items_to_get.empty() &&
2776  peer->number_of_unfetched_item_ids == 0 &&
2777  peer->ids_of_items_being_processed.empty())
2778  peers_with_newly_empty_item_lists.insert(peer);
2779 
2780  // in this case, we know the peer was offering us this exact item, no need to
2781  // try to inform them of its existence
2782  }
2783  }
2784  } // for
2785  } // lock_guard
2786  }
2787  else
2788  {
2789  // invalid message received
2791  for (const peer_connection_ptr& peer : _active_connections)
2792  {
2793  if (peer->ids_of_items_being_processed.find(block_message_to_send.block_id)
2794  != peer->ids_of_items_being_processed.end())
2795  {
2796  if (discontinue_fetching_blocks_from_peer)
2797  {
2798  wlog("inhibiting fetching sync blocks from peer ${endpoint} because it is on a fork that's too old",
2799  ("endpoint", peer->get_remote_endpoint()));
2800  peer->inhibit_fetching_sync_blocks = true;
2801  }
2802  else
2803  peers_to_disconnect[peer] = std::make_pair(
2804  std::string("You offered us a block that we reject as invalid"),
2805  fc::oexception(handle_message_exception));
2806  }
2807  }
2808  }
2809 
2810  for (auto& peer_to_disconnect : peers_to_disconnect)
2811  {
2812  const peer_connection_ptr& peer = peer_to_disconnect.first;
2813  std::string reason_string;
2814  fc::oexception reason_exception;
2815  std::tie(reason_string, reason_exception) = peer_to_disconnect.second;
2816  wlog("disconnecting client ${endpoint} because it offered us the rejected block",
2817  ("endpoint", peer->get_remote_endpoint()));
2818  disconnect_from_peer(peer.get(), reason_string, true, reason_exception);
2819  }
2820  for (const peer_connection_ptr& peer : peers_with_newly_empty_item_lists)
2822 
2823  for (const peer_connection_ptr& peer : peers_we_need_to_sync_to)
2825 
2826  dlog("Leaving send_sync_block_to_node_delegate");
2827 
2828  if (// _suspend_fetching_sync_blocks && <-- you can use this if "maximum_number_of_blocks_to_handle_at_one_time" == "maximum_number_of_sync_blocks_to_prefetch"
2832  "process_backlog_of_sync_blocks");
2833  }
2834 
2836  {
2838  // garbage-collect the list of async tasks here for lack of a better place
2839  for (auto calls_iter = _handle_message_calls_in_progress.begin();
2840  calls_iter != _handle_message_calls_in_progress.end();)
2841  {
2842  if (calls_iter->ready())
2843  calls_iter = _handle_message_calls_in_progress.erase(calls_iter);
2844  else
2845  ++calls_iter;
2846  }
2847 
2848  dlog("in process_backlog_of_sync_blocks");
2850  {
2851  dlog("leaving process_backlog_of_sync_blocks because we're already processing too many blocks");
2852  return; // we will be rescheduled when the next block finishes its processing
2853  }
2854  dlog("currently ${count} blocks in the process of being handled", ("count", _handle_message_calls_in_progress.size()));
2855 
2856 
2858  {
2859  dlog("resuming processing sync block backlog because we only ${count} blocks in progress",
2860  ("count", _handle_message_calls_in_progress.size()));
2862  }
2863 
2864 
2865  // when syncing with multiple peers, it's possible that we'll have hundreds of blocks ready to push
2866  // to the client at once. This can be slow, and we need to limit the number we push at any given
2867  // time to allow network traffic to continue so we don't end up disconnecting from peers
2868  //fc::time_point start_time = fc::time_point::now();
2869  //fc::time_point when_we_should_yield = start_time + fc::seconds(1);
2870 
2871  bool block_processed_this_iteration;
2872  unsigned blocks_processed = 0;
2873 
2874  std::set<peer_connection_ptr> peers_with_newly_empty_item_lists;
2875  std::set<peer_connection_ptr> peers_we_need_to_sync_to;
2876  std::map<peer_connection_ptr, fc::oexception> peers_with_rejected_block;
2877 
2878  do
2879  {
2880  std::copy(std::make_move_iterator(_new_received_sync_items.begin()),
2881  std::make_move_iterator(_new_received_sync_items.end()),
2882  std::front_inserter(_received_sync_items));
2883  _new_received_sync_items.clear();
2884  dlog("currently ${count} sync items to consider", ("count", _received_sync_items.size()));
2885 
2886  block_processed_this_iteration = false;
2887  for (auto received_block_iter = _received_sync_items.begin();
2888  received_block_iter != _received_sync_items.end();
2889  ++received_block_iter)
2890  {
2891 
2892  // find out if this block is the next block on the active chain or one of the forks
2893  bool potential_first_block = false;
2894  {
2896  for (const peer_connection_ptr& peer : _active_connections)
2897  {
2898  if (!peer->ids_of_items_to_get.empty() &&
2899  peer->ids_of_items_to_get.front() == received_block_iter->block_id)
2900  {
2901  potential_first_block = true;
2902  peer->ids_of_items_to_get.pop_front();
2903  peer->ids_of_items_being_processed.insert(received_block_iter->block_id);
2904  }
2905  }
2906  }
2907 
2908  // if it is, process it, remove it from all sync peers lists
2909  if (potential_first_block)
2910  {
2911  // we can get into an interesting situation near the end of synchronization. We can be in
2912  // sync with one peer who is sending us the last block on the chain via a regular inventory
2913  // message, while at the same time still be synchronizing with a peer who is sending us the
2914  // block through the sync mechanism. Further, we must request both blocks because
2915  // we don't know they're the same (for the peer in normal operation, it has only told us the
2916  // message id, for the peer in the sync case we only known the block_id).
2917  if (std::find(_most_recent_blocks_accepted.begin(), _most_recent_blocks_accepted.end(),
2918  received_block_iter->block_id) == _most_recent_blocks_accepted.end())
2919  {
2920  graphene::net::block_message block_message_to_process = *received_block_iter;
2921  _received_sync_items.erase(received_block_iter);
2922  _handle_message_calls_in_progress.emplace_back(fc::async([this, block_message_to_process](){
2923  send_sync_block_to_node_delegate(block_message_to_process);
2924  }, "send_sync_block_to_node_delegate"));
2925  ++blocks_processed;
2926  block_processed_this_iteration = true;
2927  }
2928  else
2929  {
2930  dlog("Already received and accepted this block (presumably through normal inventory mechanism), treating it as accepted");
2931  std::vector< peer_connection_ptr > peers_needing_next_batch;
2933  for (const peer_connection_ptr& peer : _active_connections)
2934  {
2935  auto items_being_processed_iter = peer->ids_of_items_being_processed.find(received_block_iter->block_id);
2936  if (items_being_processed_iter != peer->ids_of_items_being_processed.end())
2937  {
2938  peer->ids_of_items_being_processed.erase(items_being_processed_iter);
2939  dlog("Removed item from ${endpoint}'s list of items being processed, still processing ${len} blocks",
2940  ("endpoint", peer->get_remote_endpoint())("len", peer->ids_of_items_being_processed.size()));
2941 
2942  // if we just processed the last item in our list from this peer, we will want to
2943  // send another request to find out if we are now in sync (this is normally handled in
2944  // send_sync_block_to_node_delegate)
2945  if (peer->ids_of_items_to_get.empty() &&
2946  peer->number_of_unfetched_item_ids == 0 &&
2947  peer->ids_of_items_being_processed.empty())
2948  {
2949  dlog("We received last item in our list for peer ${endpoint}, setup to do a sync check", ("endpoint", peer->get_remote_endpoint()));
2950  peers_needing_next_batch.push_back( peer );
2951  }
2952  }
2953  }
2954  for( const peer_connection_ptr& peer : peers_needing_next_batch )
2956  }
2957 
2958  break; // start iterating _received_sync_items from the beginning
2959  } // end if potential_first_block
2960  } // end for each block in _received_sync_items
2961 
2963  {
2964  dlog("stopping processing sync block backlog because we have ${count} blocks in progress",
2965  ("count", _handle_message_calls_in_progress.size()));
2966  //ulog("stopping processing sync block backlog because we have ${count} blocks in progress, total on hand: ${received}",
2967  // ("count", _handle_message_calls_in_progress.size())("received", _received_sync_items.size()));
2970  break;
2971  }
2972  } while (block_processed_this_iteration);
2973 
2974  dlog("leaving process_backlog_of_sync_blocks, ${count} processed", ("count", blocks_processed));
2975 
2978  }
2979 
2981  {
2982  if (!_node_is_shutting_down &&
2984  _process_backlog_of_sync_blocks_done = fc::async([=](){ process_backlog_of_sync_blocks(); }, "process_backlog_of_sync_blocks");
2985  }
2986 
2988  const graphene::net::block_message& block_message_to_process, const message_hash_type& message_hash )
2989  {
2991  dlog( "received a sync block from peer ${endpoint}", ("endpoint", originating_peer->get_remote_endpoint() ) );
2992 
2993  // add it to the front of _received_sync_items, then process _received_sync_items to try to
2994  // pass as many messages as possible to the client.
2995  _new_received_sync_items.push_front( block_message_to_process );
2997  }
2998 
3000  const graphene::net::block_message& block_message_to_process,
3001  const message_hash_type& message_hash )
3002  {
3003  fc::time_point message_receive_time = fc::time_point::now();
3004 
3005  dlog( "received a block from peer ${endpoint}, passing it to client", ("endpoint", originating_peer->get_remote_endpoint() ) );
3006  std::set<peer_connection_ptr> peers_to_disconnect;
3007  std::string disconnect_reason;
3008  fc::oexception disconnect_exception;
3009  fc::oexception restart_sync_exception;
3010  try
3011  {
3012  // we can get into an intersting situation near the end of synchronization. We can be in
3013  // sync with one peer who is sending us the last block on the chain via a regular inventory
3014  // message, while at the same time still be synchronizing with a peer who is sending us the
3015  // block through the sync mechanism. Further, we must request both blocks because
3016  // we don't know they're the same (for the peer in normal operation, it has only told us the
3017  // message id, for the peer in the sync case we only known the block_id).
3018  fc::time_point message_validated_time;
3019  if (std::find(_most_recent_blocks_accepted.begin(), _most_recent_blocks_accepted.end(),
3020  block_message_to_process.block_id) == _most_recent_blocks_accepted.end())
3021  {
3022  std::vector<fc::uint160_t> contained_transaction_message_ids;
3023  _delegate->handle_block(block_message_to_process, false, contained_transaction_message_ids);
3024  message_validated_time = fc::time_point::now();
3025  ilog("Successfully pushed block ${num} (id:${id})",
3026  ("num", block_message_to_process.block.block_num())
3027  ("id", block_message_to_process.block_id));
3028  _most_recent_blocks_accepted.push_back(block_message_to_process.block_id);
3029 
3030  bool new_transaction_discovered = false;
3031  for (const item_hash_t& transaction_message_hash : contained_transaction_message_ids)
3032  {
3033  /*size_t items_erased =*/ _items_to_fetch.get<item_id_index>().erase(item_id(trx_message_type, transaction_message_hash));
3034  // there are two ways we could behave here: we could either act as if we received
3035  // the transaction outside the block and offer it to our peers, or we could just
3036  // forget about it (we would still advertise this block to our peers so they should
3037  // get the transaction through that mechanism).
3038  // We take the second approach, bring in the next if block to try the first approach
3039  //if (items_erased)
3040  //{
3041  // new_transaction_discovered = true;
3042  // _new_inventory.insert(item_id(trx_message_type, transaction_message_hash));
3043  //}
3044  }
3045  if (new_transaction_discovered)
3047  }
3048  else
3049  dlog( "Already received and accepted this block (presumably through sync mechanism), treating it as accepted" );
3050 
3051  dlog( "client validated the block, advertising it to other peers" );
3052 
3053  item_id block_message_item_id(core_message_type_enum::block_message_type, message_hash);
3054  uint32_t block_number = block_message_to_process.block.block_num();
3055  fc::time_point_sec block_time = block_message_to_process.block.timestamp;
3056  {
3058  for (const peer_connection_ptr& peer : _active_connections)
3059  {
3060  auto iter = peer->inventory_peer_advertised_to_us.find(block_message_item_id);
3061  if (iter != peer->inventory_peer_advertised_to_us.end())
3062  {
3063  // this peer offered us the item. It will eventually expire from the peer's
3064  // inventory_peer_advertised_to_us list after some time has passed (currently 2 minutes).
3065  // For now, it will remain there, which will prevent us from offering the peer this
3066  // block back when we rebroadcast the block below
3067  peer->last_block_delegate_has_seen = block_message_to_process.block_id;
3068  peer->last_block_time_delegate_has_seen = block_time;
3069  }
3070  peer->clear_old_inventory();
3071  }
3072  }
3073  message_propagation_data propagation_data{message_receive_time, message_validated_time, originating_peer->node_id};
3074  broadcast( block_message_to_process, propagation_data );
3076 
3077  if (is_hard_fork_block(block_number))
3078  {
3079  // we just pushed a hard fork block. Find out if any of our peers are running clients
3080  // that will be unable to process future blocks
3082  for (const peer_connection_ptr& peer : _active_connections)
3083  {
3084  if (peer->last_known_fork_block_number != 0)
3085  {
3086  uint32_t next_fork_block_number = get_next_known_hard_fork_block_number(peer->last_known_fork_block_number);
3087  if (next_fork_block_number != 0 &&
3088  next_fork_block_number <= block_number)
3089  {
3090  peers_to_disconnect.insert(peer);
3091 #ifdef ENABLE_DEBUG_ULOGS
3092  ulog("Disconnecting from peer because their version is too old. Their version date: ${date}", ("date", peer->graphene_git_revision_unix_timestamp));
3093 #endif
3094  }
3095  }
3096  }
3097  if (!peers_to_disconnect.empty())
3098  {
3099  std::ostringstream disconnect_reason_stream;
3100  disconnect_reason_stream << "You need to upgrade your client due to hard fork at block " << block_number;
3101  disconnect_reason = disconnect_reason_stream.str();
3102  disconnect_exception = fc::exception(FC_LOG_MESSAGE(error, "You need to upgrade your client due to hard fork at block ${block_number}",
3103  ("block_number", block_number)));
3104  }
3105  }
3106  }
3107  catch (const fc::canceled_exception&)
3108  {
3109  throw;
3110  }
3111  catch (const unlinkable_block_exception& e)
3112  {
3113  restart_sync_exception = e;
3114  }
3115  catch (const fc::exception& e)
3116  {
3117  // client rejected the block. Disconnect the client and any other clients that offered us this block
3118  auto block_num = block_message_to_process.block.block_num();
3119  wlog("Failed to push block ${num} (id:${id}), client rejected block sent by peer: ${e}",
3120  ("num", block_num)
3121  ("id", block_message_to_process.block_id)
3122  ("e",e));
3123 
3124  if( e.code() == block_timestamp_in_future_exception::code_enum::code_value )
3125  {
3126  disconnect_exception = block_timestamp_in_future_exception( FC_LOG_MESSAGE( warn, "",
3127  ("block_header", static_cast<graphene::protocol::block_header>(block_message_to_process.block))
3128  ("block_num", block_num)
3129  ("block_id", block_message_to_process.block_id) ) );
3130  }
3131  else
3132  disconnect_exception = e;
3133  disconnect_reason = "You offered me a block that I have deemed to be invalid";
3134 
3135  peers_to_disconnect.insert( originating_peer->shared_from_this() );
3137  for (const peer_connection_ptr& peer : _active_connections)
3138  if (!peer->ids_of_items_to_get.empty() && peer->ids_of_items_to_get.front() == block_message_to_process.block_id)
3139  peers_to_disconnect.insert(peer);
3140  }
3141 
3142  if (restart_sync_exception)
3143  {
3144  wlog("Peer ${peer} sent me a block that didn't link to our blockchain. Restarting sync mode with them to get the missing block. "
3145  "Error pushing block was: ${e}",
3146  ("peer", originating_peer->get_remote_endpoint())
3147  ("e", *restart_sync_exception));
3148  start_synchronizing_with_peer(originating_peer->shared_from_this());
3149  }
3150 
3151  for (const peer_connection_ptr& peer : peers_to_disconnect)
3152  {
3153  wlog("disconnecting client ${endpoint} because it offered us the rejected block", ("endpoint", peer->get_remote_endpoint()));
3154  disconnect_from_peer(peer.get(), disconnect_reason, true, *disconnect_exception);
3155  }
3156  }
3158  const message& message_to_process,
3159  const message_hash_type& message_hash)
3160  {
3162  // find out whether we requested this item while we were synchronizing or during normal operation
3163  // (it's possible that we request an item during normal operation and then get kicked into sync
3164  // mode before we receive and process the item. In that case, we should process the item as a normal
3165  // item to avoid confusing the sync code)
3166  graphene::net::block_message block_message_to_process(message_to_process.as<graphene::net::block_message>());
3167  auto item_iter = originating_peer->items_requested_from_peer.find(item_id(graphene::net::block_message_type, message_hash));
3168  if (item_iter != originating_peer->items_requested_from_peer.end())
3169  {
3170  originating_peer->items_requested_from_peer.erase(item_iter);
3171  process_block_during_normal_operation(originating_peer, block_message_to_process, message_hash);
3172  if (originating_peer->idle())
3174  return;
3175  }
3176  else
3177  {
3178  // not during normal operation. see if we requested it during sync
3179  auto sync_item_iter = originating_peer->sync_items_requested_from_peer.find( block_message_to_process.block_id);
3180  if (sync_item_iter != originating_peer->sync_items_requested_from_peer.end())
3181  {
3182  originating_peer->sync_items_requested_from_peer.erase(sync_item_iter);
3183  // if exceptions are throw here after removing the sync item from the list (above),
3184  // it could leave our sync in a stalled state. Wrap a try/catch around the rest
3185  // of the function so we can log if this ever happens.
3186  try
3187  {
3188  originating_peer->last_sync_item_received_time = fc::time_point::now();
3189  _active_sync_requests.erase(block_message_to_process.block_id);
3190  process_block_during_sync(originating_peer, block_message_to_process, message_hash);
3191  if (originating_peer->idle())
3192  {
3193  // we have finished fetching a batch of items, so we either need to grab another batch of items
3194  // or we need to get another list of item ids.
3195  if (originating_peer->number_of_unfetched_item_ids > 0 &&
3197  fetch_next_batch_of_item_ids_from_peer(originating_peer);
3198  else
3200  }
3201  return;
3202  }
3203  catch (const fc::canceled_exception& e)
3204  {
3205  throw;
3206  }
3207  catch (const fc::exception& e)
3208  {
3209  elog("Caught unexpected exception: ${e}", ("e", e));
3210  assert(false && "exceptions not expected here");
3211  }
3212  catch (const std::exception& e)
3213  {
3214  elog("Caught unexpected exception: ${e}", ("e", e.what()));
3215  assert(false && "exceptions not expected here");
3216  }
3217  catch (...)
3218  {
3219  elog("Caught unexpected exception, could break sync operation");
3220  }
3221  }
3222  }
3223 
3224  // if we get here, we didn't request the message, we must have a misbehaving peer
3225  wlog("received a block ${block_id} I didn't ask for from peer ${endpoint}, disconnecting from peer",
3226  ("endpoint", originating_peer->get_remote_endpoint())
3227  ("block_id", block_message_to_process.block_id));
3228  fc::exception detailed_error(FC_LOG_MESSAGE(error, "You sent me a block that I didn't ask for, block_id: ${block_id}",
3229  ("block_id", block_message_to_process.block_id)
3230  ("graphene_git_revision_sha", originating_peer->graphene_git_revision_sha)
3231  ("graphene_git_revision_unix_timestamp", originating_peer->graphene_git_revision_unix_timestamp)
3232  ("fc_git_revision_sha", originating_peer->fc_git_revision_sha)
3233  ("fc_git_revision_unix_timestamp", originating_peer->fc_git_revision_unix_timestamp)));
3234  disconnect_from_peer(originating_peer, "You sent me a block that I didn't ask for", true, detailed_error);
3235  }
3236 
3238  const current_time_request_message& current_time_request_message_received)
3239  {
3241  fc::time_point request_received_time(fc::time_point::now());
3242  current_time_reply_message reply(current_time_request_message_received.request_sent_time,
3243  request_received_time);
3244  originating_peer->send_message(reply, offsetof(current_time_reply_message, reply_transmitted_time));
3245  }
3246 
3248  const current_time_reply_message& current_time_reply_message_received)
3249  {
3251  fc::time_point reply_received_time = fc::time_point::now();
3252  originating_peer->clock_offset = fc::microseconds(((current_time_reply_message_received.request_received_time - current_time_reply_message_received.request_sent_time) +
3253  (current_time_reply_message_received.reply_transmitted_time - reply_received_time)).count() / 2);
3254  originating_peer->round_trip_delay = (reply_received_time - current_time_reply_message_received.request_sent_time) -
3255  (current_time_reply_message_received.reply_transmitted_time - current_time_reply_message_received.request_received_time);
3256  }
3257 
3259  {
3260  {
3262  for (const peer_connection_ptr& peer : _active_connections)
3263  {
3264  if (firewall_check_state->expected_node_id != peer->node_id && // it's not the node who is asking us to test
3265  !peer->firewall_check_state && // the peer isn't already performing a check for another node
3266  firewall_check_state->nodes_already_tested.find(peer->node_id) == firewall_check_state->nodes_already_tested.end() &&
3267  peer->core_protocol_version >= 106)
3268  {
3269  wlog("forwarding firewall check for node ${to_check} to peer ${checker}",
3270  ("to_check", firewall_check_state->endpoint_to_test)
3271  ("checker", peer->get_remote_endpoint()));
3272  firewall_check_state->nodes_already_tested.insert(peer->node_id);
3273  peer->firewall_check_state = firewall_check_state;
3274  check_firewall_message check_request;
3275  check_request.endpoint_to_check = firewall_check_state->endpoint_to_test;
3276  check_request.node_id = firewall_check_state->expected_node_id;
3277  peer->send_message(check_request);
3278  return;
3279  }
3280  }
3281  } // lock_guard
3282  wlog("Unable to forward firewall check for node ${to_check} to any other peers, returning 'unable'",
3283  ("to_check", firewall_check_state->endpoint_to_test));
3284 
3285  peer_connection_ptr originating_peer = get_peer_by_node_id(firewall_check_state->expected_node_id);
3286  if (originating_peer)
3287  {
3289  reply.node_id = firewall_check_state->expected_node_id;
3290  reply.endpoint_checked = firewall_check_state->endpoint_to_test;
3292  originating_peer->send_message(reply);
3293  }
3294  delete firewall_check_state;
3295  }
3296 
3298  const check_firewall_message& check_firewall_message_received)
3299  {
3301 
3302  if (check_firewall_message_received.node_id == node_id_t() &&
3303  check_firewall_message_received.endpoint_to_check == fc::ip::endpoint())
3304  {
3305  // originating_peer is asking us to test whether it is firewalled
3306  // we're not going to try to connect back to the originating peer directly,
3307  // instead, we're going to coordinate requests by asking some of our peers
3308  // to try to connect to the originating peer, and relay the results back
3309  wlog("Peer ${peer} wants us to check whether it is firewalled", ("peer", originating_peer->get_remote_endpoint()));
3310  firewall_check_state_data* firewall_check_state = new firewall_check_state_data;
3311  // if they are using the same inbound and outbound port, try connecting to their outbound endpoint.
3312  // if they are using a different inbound port, use their outbound address but the inbound port they reported
3313  fc::ip::endpoint endpoint_to_check = originating_peer->get_socket().remote_endpoint();
3314  if (originating_peer->inbound_port != originating_peer->outbound_port)
3315  endpoint_to_check = fc::ip::endpoint(endpoint_to_check.get_address(), originating_peer->inbound_port);
3316  firewall_check_state->endpoint_to_test = endpoint_to_check;
3317  firewall_check_state->expected_node_id = originating_peer->node_id;
3318  firewall_check_state->requesting_peer = originating_peer->node_id;
3319 
3320  forward_firewall_check_to_next_available_peer(firewall_check_state);
3321  }
3322  else
3323  {
3324  // we're being asked to check another node
3325  // first, find out if we're currently connected to that node. If we are, we
3326  // can't perform the test
3327  if (is_already_connected_to_id(check_firewall_message_received.node_id) ||
3328  is_connection_to_endpoint_in_progress(check_firewall_message_received.endpoint_to_check))
3329  {
3331  reply.node_id = check_firewall_message_received.node_id;
3332  reply.endpoint_checked = check_firewall_message_received.endpoint_to_check;
3334  }
3335  else
3336  {
3337  // we're not connected to them, so we need to set up a connection to them
3338  // to test.
3339  peer_connection_ptr peer_for_testing(peer_connection::make_shared(this));
3340  peer_for_testing->firewall_check_state = new firewall_check_state_data;
3341  peer_for_testing->firewall_check_state->endpoint_to_test = check_firewall_message_received.endpoint_to_check;
3342  peer_for_testing->firewall_check_state->expected_node_id = check_firewall_message_received.node_id;
3343  peer_for_testing->firewall_check_state->requesting_peer = originating_peer->node_id;
3344  peer_for_testing->set_remote_endpoint(check_firewall_message_received.endpoint_to_check);
3345  initiate_connect_to(peer_for_testing);
3346  }
3347  }
3348  }
3349 
3351  const check_firewall_reply_message& check_firewall_reply_message_received)
3352  {
3354 
3355  if (originating_peer->firewall_check_state &&
3356  originating_peer->firewall_check_state->requesting_peer != node_id_t())
3357  {
3358  // then this is a peer that is helping us check the firewalled state of one of our other peers
3359  // and they're reporting back
3360  // if they got a result, return it to the original peer. if they were unable to check,
3361  // we'll try another peer.
3362  wlog("Peer ${reporter} reports firewall check status ${status} for ${peer}",
3363  ("reporter", originating_peer->get_remote_endpoint())
3364  ("status", check_firewall_reply_message_received.result)
3365  ("peer", check_firewall_reply_message_received.endpoint_checked));
3366 
3367  if (check_firewall_reply_message_received.result == firewall_check_result::unable_to_connect ||
3368  check_firewall_reply_message_received.result == firewall_check_result::connection_successful)
3369  {
3371  if (original_peer)
3372  {
3373  if (check_firewall_reply_message_received.result == firewall_check_result::connection_successful)
3374  {
3375  // if we previously thought this peer was firewalled, mark them as not firewalled
3376  if (original_peer->is_firewalled != firewalled_state::not_firewalled)
3377  {
3378 
3379  original_peer->is_firewalled = firewalled_state::not_firewalled;
3380  // there should be no old entry if we thought they were firewalled, so just create a new one
3381  fc::optional<fc::ip::endpoint> inbound_endpoint = originating_peer->get_endpoint_for_connecting();
3382  if (inbound_endpoint)
3383  {
3384  potential_peer_record updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_endpoint(*inbound_endpoint);
3385  updated_peer_record.last_seen_time = fc::time_point::now();
3386  _potential_peer_db.update_entry(updated_peer_record);
3387  }
3388  }
3389  }
3390  original_peer->send_message(check_firewall_reply_message_received);
3391  }
3392  delete originating_peer->firewall_check_state;
3393  originating_peer->firewall_check_state = nullptr;
3394  }
3395  else
3396  {
3397  // they were unable to check for us, ask another peer
3398  firewall_check_state_data* firewall_check_state = originating_peer->firewall_check_state;
3399  originating_peer->firewall_check_state = nullptr;
3400  forward_firewall_check_to_next_available_peer(firewall_check_state);
3401  }
3402  }
3403  else if (originating_peer->firewall_check_state)
3404  {
3405  // this is a reply to a firewall check we initiated.
3406  wlog("Firewall check we initiated has returned with result: ${result}, endpoint = ${endpoint}",
3407  ("result", check_firewall_reply_message_received.result)
3408  ("endpoint", check_firewall_reply_message_received.endpoint_checked));
3409  if (check_firewall_reply_message_received.result == firewall_check_result::connection_successful)
3410  {
3412  _publicly_visible_listening_endpoint = check_firewall_reply_message_received.endpoint_checked;
3413  }
3414  else if (check_firewall_reply_message_received.result == firewall_check_result::unable_to_connect)
3415  {
3418  }
3419  delete originating_peer->firewall_check_state;
3420  originating_peer->firewall_check_state = nullptr;
3421  }
3422  else
3423  {
3424  wlog("Received a firewall check reply to a request I never sent");
3425  }
3426 
3427  }
3428 
3430  const get_current_connections_request_message& get_current_connections_request_message_received)
3431  {
3434 
3436  {
3439 
3440  size_t minutes_to_average = std::min(_average_network_write_speed_minutes.size(), (size_t)15);
3441  boost::circular_buffer<uint32_t>::iterator start_iter = _average_network_write_speed_minutes.end() - minutes_to_average;
3442  reply.upload_rate_fifteen_minutes = std::accumulate(start_iter, _average_network_write_speed_minutes.end(), 0) / (uint32_t)minutes_to_average;
3443  start_iter = _average_network_read_speed_minutes.end() - minutes_to_average;
3444  reply.download_rate_fifteen_minutes = std::accumulate(start_iter, _average_network_read_speed_minutes.end(), 0) / (uint32_t)minutes_to_average;
3445 
3446  minutes_to_average = std::min(_average_network_write_speed_minutes.size(), (size_t)60);
3447  start_iter = _average_network_write_speed_minutes.end() - minutes_to_average;
3448  reply.upload_rate_one_hour = std::accumulate(start_iter, _average_network_write_speed_minutes.end(), 0) / (uint32_t)minutes_to_average;
3449  start_iter = _average_network_read_speed_minutes.end() - minutes_to_average;
3450  reply.download_rate_one_hour = std::accumulate(start_iter, _average_network_read_speed_minutes.end(), 0) / (uint32_t)minutes_to_average;
3451  }
3452 
3455  for (const peer_connection_ptr& peer : _active_connections)
3456  {
3457  current_connection_data data_for_this_peer;
3458  data_for_this_peer.connection_duration = now.sec_since_epoch() - peer->connection_initiation_time.sec_since_epoch();
3459  if (peer->get_remote_endpoint()) // should always be set for anyone we're actively connected to
3460  data_for_this_peer.remote_endpoint = *peer->get_remote_endpoint();
3461  data_for_this_peer.clock_offset = peer->clock_offset;
3462  data_for_this_peer.round_trip_delay = peer->round_trip_delay;
3463  data_for_this_peer.node_id = peer->node_id;
3464  data_for_this_peer.connection_direction = peer->direction;
3465  data_for_this_peer.firewalled = peer->is_firewalled;
3466  fc::mutable_variant_object user_data;
3467  if (peer->graphene_git_revision_sha)
3468  user_data["graphene_git_revision_sha"] = *peer->graphene_git_revision_sha;
3469  if (peer->graphene_git_revision_unix_timestamp)
3470  user_data["graphene_git_revision_unix_timestamp"] = *peer->graphene_git_revision_unix_timestamp;
3471  if (peer->fc_git_revision_sha)
3472  user_data["fc_git_revision_sha"] = *peer->fc_git_revision_sha;
3473  if (peer->fc_git_revision_unix_timestamp)
3474  user_data["fc_git_revision_unix_timestamp"] = *peer->fc_git_revision_unix_timestamp;
3475  if (peer->platform)
3476  user_data["platform"] = *peer->platform;
3477  if (peer->bitness)
3478  user_data["bitness"] = *peer->bitness;
3479  user_data["user_agent"] = peer->user_agent;
3480 
3481  user_data["last_known_block_hash"] = fc::variant( peer->last_block_delegate_has_seen, 1 );
3482  user_data["last_known_block_number"] = _delegate->get_block_number(peer->last_block_delegate_has_seen);
3483  user_data["last_known_block_time"] = peer->last_block_time_delegate_has_seen;
3484 
3485  data_for_this_peer.user_data = user_data;
3486  reply.current_connections.emplace_back(data_for_this_peer);
3487  }
3488  originating_peer->send_message(reply);
3489  }
3490 
3492  const get_current_connections_reply_message& get_current_connections_reply_message_received)
3493  {
3495  }
3496 
3497 
3498  // this handles any message we get that doesn't require any special processing.
3499  // currently, this is any message other than block messages and p2p-specific
3500  // messages. (transaction messages would be handled here, for example)
3501  // this just passes the message to the client, and does the bookkeeping
3502  // related to requesting and rebroadcasting the message.
3504  const message& message_to_process,
3505  const message_hash_type& message_hash )
3506  {
3508  fc::time_point message_receive_time = fc::time_point::now();
3509 
3510  // only process it if we asked for it
3511  auto iter = originating_peer->items_requested_from_peer.find(
3512  item_id(message_to_process.msg_type.value(), message_hash) );
3513  if( iter == originating_peer->items_requested_from_peer.end() )
3514  {
3515  wlog( "received a message I didn't ask for from peer ${endpoint}, disconnecting from peer",
3516  ( "endpoint", originating_peer->get_remote_endpoint() ) );
3517  fc::exception detailed_error( FC_LOG_MESSAGE(error,
3518  "You sent me a message that I didn't ask for, message_hash: ${message_hash}",
3519  ( "message_hash", message_hash ) ) );
3520  disconnect_from_peer( originating_peer, "You sent me a message that I didn't request", true, detailed_error );
3521  return;
3522  }
3523  else
3524  {
3525  originating_peer->items_requested_from_peer.erase( iter );
3526  if (originating_peer->idle())
3528 
3529  // Next: have the delegate process the message
3530  fc::time_point message_validated_time;
3531  try
3532  {
3533  if (message_to_process.msg_type.value() == trx_message_type)
3534  {
3535  trx_message transaction_message_to_process = message_to_process.as<trx_message>();
3536  dlog( "passing message containing transaction ${trx} to client",
3537  ("trx", transaction_message_to_process.trx.id()) );
3538  _delegate->handle_transaction(transaction_message_to_process);
3539  }
3540  else
3541  _delegate->handle_message( message_to_process );
3542  message_validated_time = fc::time_point::now();
3543  }
3544  catch ( const fc::canceled_exception& )
3545  {
3546  throw;
3547  }
3548  catch ( const fc::exception& e )
3549  {
3550  switch( e.code() )
3551  {
3552  // log common exceptions in debug level
3553  case graphene::chain::duplicate_transaction::code_enum::code_value :
3554  case graphene::chain::limit_order_create_kill_unfilled::code_enum::code_value :
3555  case graphene::chain::limit_order_create_market_not_whitelisted::code_enum::code_value :
3556  case graphene::chain::limit_order_create_market_blacklisted::code_enum::code_value :
3557  case graphene::chain::limit_order_create_selling_asset_unauthorized::code_enum::code_value :
3558  case graphene::chain::limit_order_create_receiving_asset_unauthorized::code_enum::code_value :
3559  case graphene::chain::limit_order_create_insufficient_balance::code_enum::code_value :
3560  case graphene::chain::limit_order_cancel_nonexist_order::code_enum::code_value :
3561  case graphene::chain::limit_order_cancel_owner_mismatch::code_enum::code_value :
3562  dlog( "client rejected message sent by peer ${peer}, ${e}",
3563  ("peer", originating_peer->get_remote_endpoint() )("e", e) );
3564  break;
3565  // log rarer exceptions in warn level
3566  default:
3567  wlog( "client rejected message sent by peer ${peer}, ${e}",
3568  ("peer", originating_peer->get_remote_endpoint() )("e", e) );
3569  break;
3570  }
3571  // record it so we don't try to fetch this item again
3573  item_id( message_to_process.msg_type.value(), message_hash ), fc::time_point::now() ) );
3574  return;
3575  }
3576 
3577  // finally, if the delegate validated the message, broadcast it to our other peers
3578  message_propagation_data propagation_data { message_receive_time, message_validated_time,
3579  originating_peer->node_id };
3580  broadcast( message_to_process, propagation_data );
3581  }
3582  }
3583 
3585  {
3587  peer->ids_of_items_to_get.clear();
3588  peer->number_of_unfetched_item_ids = 0;
3589  peer->we_need_sync_items_from_peer = true;
3590  peer->last_block_delegate_has_seen = item_hash_t();
3591  peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hash_t());
3592  peer->inhibit_fetching_sync_blocks = false;
3594  }
3595 
3597  {
3599  for( const peer_connection_ptr& peer : _active_connections )
3601  }
3602 
3604  {
3606  peer->send_message(current_time_request_message(),
3607  offsetof(current_time_request_message, request_sent_time));
3610  {
3612  _delegate->connection_count_changed( _last_reported_number_of_connections );
3613  }
3614  }
3615 
3617  {
3619 
3620  try
3621  {
3623  }
3624  catch ( const fc::exception& e )
3625  {
3626  wlog( "Exception thrown while closing P2P peer database, ignoring: ${e}", ("e", e) );
3627  }
3628  catch (...)
3629  {
3630  wlog( "Exception thrown while closing P2P peer database, ignoring" );
3631  }
3632 
3633  // First, stop accepting incoming network connections
3634  try
3635  {
3636  _tcp_server.close();
3637  dlog("P2P TCP server closed");
3638  }
3639  catch ( const fc::exception& e )
3640  {
3641  wlog( "Exception thrown while closing P2P TCP server, ignoring: ${e}", ("e", e) );
3642  }
3643  catch (...)
3644  {
3645  wlog( "Exception thrown while closing P2P TCP server, ignoring" );
3646  }
3647 
3648  try
3649  {
3650  _accept_loop_complete.cancel_and_wait("node_impl::close()");
3651  dlog("P2P accept loop terminated");
3652  }
3653  catch ( const fc::exception& e )
3654  {
3655  wlog( "Exception thrown while terminating P2P accept loop, ignoring: ${e}", ("e", e) );
3656  }
3657  catch (...)
3658  {
3659  wlog( "Exception thrown while terminating P2P accept loop, ignoring" );
3660  }
3661 
3662  // terminate all of our long-running loops (these run continuously instead of rescheduling themselves)
3663  try
3664  {
3665  _p2p_network_connect_loop_done.cancel("node_impl::close()");
3666  // cancel() is currently broken, so we need to wake up the task to allow it to finish
3669  dlog("P2P connect loop terminated");
3670  }
3671  catch ( const fc::canceled_exception& )
3672  {
3673  dlog("P2P connect loop terminated");
3674  }
3675  catch ( const fc::exception& e )
3676  {
3677  wlog( "Exception thrown while terminating P2P connect loop, ignoring: ${e}", ("e", e) );
3678  }
3679  catch (...)
3680  {
3681  wlog( "Exception thrown while terminating P2P connect loop, ignoring" );
3682  }
3683 
3684  try
3685  {
3687  dlog("Process backlog of sync items task terminated");
3688  }
3689  catch ( const fc::canceled_exception& )
3690  {
3691  dlog("Process backlog of sync items task terminated");
3692  }
3693  catch ( const fc::exception& e )
3694  {
3695  wlog( "Exception thrown while terminating Process backlog of sync items task, ignoring: ${e}", ("e", e) );
3696  }
3697  catch (...)
3698  {
3699  wlog( "Exception thrown while terminating Process backlog of sync items task, ignoring" );
3700  }
3701 
3702  unsigned handle_message_call_count = 0;
3703  while( true )
3704  {
3705  auto it = _handle_message_calls_in_progress.begin();
3706  if( it == _handle_message_calls_in_progress.end() )
3707  break;
3708  if( it->ready() || it->error() || it->canceled() )
3709  {
3711  continue;
3712  }
3713  ++handle_message_call_count;
3714  try
3715  {
3716  it->cancel_and_wait("node_impl::close()");
3717  dlog("handle_message call #${count} task terminated", ("count", handle_message_call_count));
3718  }
3719  catch ( const fc::canceled_exception& )
3720  {
3721  dlog("handle_message call #${count} task terminated", ("count", handle_message_call_count));
3722  }
3723  catch ( const fc::exception& e )
3724  {
3725  wlog("Exception thrown while terminating handle_message call #${count} task, ignoring: ${e}", ("e", e)("count", handle_message_call_count));
3726  }
3727  catch (...)
3728  {
3729  wlog("Exception thrown while terminating handle_message call #${count} task, ignoring",("count", handle_message_call_count));
3730  }
3731  }
3732 
3733  try
3734  {
3735  _fetch_sync_items_loop_done.cancel("node_impl::close()");
3736  // cancel() is currently broken, so we need to wake up the task to allow it to finish
3739  dlog("Fetch sync items loop terminated");
3740  }
3741  catch ( const fc::canceled_exception& )
3742  {
3743  dlog("Fetch sync items loop terminated");
3744  }
3745  catch ( const fc::exception& e )
3746  {
3747  wlog( "Exception thrown while terminating Fetch sync items loop, ignoring: ${e}", ("e", e) );
3748  }
3749  catch (...)
3750  {
3751  wlog( "Exception thrown while terminating Fetch sync items loop, ignoring" );
3752  }
3753 
3754  try
3755  {
3756  _fetch_item_loop_done.cancel("node_impl::close()");
3757  // cancel() is currently broken, so we need to wake up the task to allow it to finish
3760  dlog("Fetch items loop terminated");
3761  }
3762  catch ( const fc::canceled_exception& )
3763  {
3764  dlog("Fetch items loop terminated");
3765  }
3766  catch ( const fc::exception& e )
3767  {
3768  wlog( "Exception thrown while terminating Fetch items loop, ignoring: ${e}", ("e", e) );
3769  }
3770  catch (...)
3771  {
3772  wlog( "Exception thrown while terminating Fetch items loop, ignoring" );
3773  }
3774 
3775  try
3776  {
3777  _advertise_inventory_loop_done.cancel("node_impl::close()");
3778  // cancel() is currently broken, so we need to wake up the task to allow it to finish
3781  dlog("Advertise inventory loop terminated");
3782  }
3783  catch ( const fc::canceled_exception& )
3784  {
3785  dlog("Advertise inventory loop terminated");
3786  }
3787  catch ( const fc::exception& e )
3788  {
3789  wlog( "Exception thrown while terminating Advertise inventory loop, ignoring: ${e}", ("e", e) );
3790  }
3791  catch (...)
3792  {
3793  wlog( "Exception thrown while terminating Advertise inventory loop, ignoring" );
3794  }
3795 
3796 
3797  // Next, terminate our existing connections. First, close all of the connections nicely.
3798  // This will close the sockets and may result in calls to our "on_connection_closing"
3799  // method to inform us that the connection really closed (or may not if we manage to cancel
3800  // the read loop before it gets an EOF).
3801  // operate off copies of the lists in case they change during iteration
3802  std::list<peer_connection_ptr> all_peers;
3803  auto p_back = [&all_peers](const peer_connection_ptr& conn) { all_peers.push_back(conn); };
3804  {
3807  }
3808  {
3811  }
3812  {
3815  }
3816 
3817  for (const peer_connection_ptr& peer : all_peers)
3818  {
3819  try
3820  {
3821  peer->destroy_connection();
3822  }
3823  catch ( const fc::exception& e )
3824  {
3825  wlog( "Exception thrown while closing peer connection, ignoring: ${e}", ("e", e) );
3826  }
3827  catch (...)
3828  {
3829  wlog( "Exception thrown while closing peer connection, ignoring" );
3830  }
3831  }
3832 
3833  // and delete all of the peer_connection objects
3837  all_peers.clear();
3838 
3839  {
3840 #ifdef USE_PEERS_TO_DELETE_MUTEX
3841  fc::scoped_lock<fc::mutex> lock(_peers_to_delete_mutex);
3842 #endif
3843  try
3844  {
3845  _delayed_peer_deletion_task_done.cancel_and_wait("node_impl::close()");
3846  dlog("Delayed peer deletion task terminated");
3847  }
3848  catch ( const fc::exception& e )
3849  {
3850  wlog( "Exception thrown while terminating Delayed peer deletion task, ignoring: ${e}", ("e", e) );
3851  }
3852  catch (...)
3853  {
3854  wlog( "Exception thrown while terminating Delayed peer deletion task, ignoring" );
3855  }
3856  _peers_to_delete.clear();
3857  }
3858 
3859  // Now that there are no more peers that can call methods on us, there should be no
3860  // chance for one of our loops to be rescheduled, so we can safely terminate all of
3861  // our loops now
3862  try
3863  {
3865  dlog("Terminate inactive connections loop terminated");
3866  }
3867  catch ( const fc::exception& e )
3868  {
3869  wlog( "Exception thrown while terminating Terminate inactive connections loop, ignoring: ${e}", ("e", e) );
3870  }
3871  catch (...)
3872  {
3873  wlog( "Exception thrown while terminating Terminate inactive connections loop, ignoring" );
3874  }
3875 
3876  try
3877  {
3879  dlog("Fetch updated peer lists loop terminated");
3880  }
3881  catch ( const fc::exception& e )
3882  {
3883  wlog( "Exception thrown while terminating Fetch updated peer lists loop, ignoring: ${e}", ("e", e) );
3884  }
3885  catch (...)
3886  {
3887  wlog( "Exception thrown while terminating Fetch updated peer lists loop, ignoring" );
3888  }
3889 
3890  try
3891  {
3892  _update_seed_nodes_loop_done.cancel_and_wait("node_impl::close()");
3893  dlog("Update seed nodes loop terminated");
3894  }
3895  catch ( const fc::exception& e )
3896  {
3897  wlog( "Exception thrown while terminating Update seed nodes loop, ignoring: ${e}", ("e", e) );
3898  }
3899  catch (...)
3900  {
3901  wlog( "Exception thrown while terminating Update seed nodes loop, ignoring" );
3902  }
3903 
3904  try
3905  {
3906  _bandwidth_monitor_loop_done.cancel_and_wait("node_impl::close()");
3907  dlog("Bandwidth monitor loop terminated");
3908  }
3909  catch ( const fc::exception& e )
3910  {
3911  wlog( "Exception thrown while terminating Bandwidth monitor loop, ignoring: ${e}", ("e", e) );
3912  }
3913  catch (...)
3914  {
3915  wlog( "Exception thrown while terminating Bandwidth monitor loop, ignoring" );
3916  }
3917 
3918  try
3919  {
3920  _dump_node_status_task_done.cancel_and_wait("node_impl::close()");
3921  dlog("Dump node status task terminated");
3922  }
3923  catch ( const fc::exception& e )
3924  {
3925  wlog( "Exception thrown while terminating Dump node status task, ignoring: ${e}", ("e", e) );
3926  }
3927  catch (...)
3928  {
3929  wlog( "Exception thrown while terminating Dump node status task, ignoring" );
3930  }
3931  } // node_impl::close()
3932 
3934  {
3936  new_peer->accept_connection(); // this blocks until the secure connection is fully negotiated
3937  send_hello_message(new_peer);
3938  }
3939 
3941  {
3943  while ( !_accept_loop_complete.canceled() )
3944  {
3946 
3947  try
3948  {
3949  _tcp_server.accept( new_peer->get_socket() );
3950  ilog( "accepted inbound connection from ${remote_endpoint}", ("remote_endpoint", new_peer->get_socket().remote_endpoint() ) );
3952  return;
3953  new_peer->connection_initiation_time = fc::time_point::now();
3954  _handshaking_connections.insert( new_peer );
3955  _rate_limiter.add_tcp_socket( &new_peer->get_socket() );
3956  std::weak_ptr<peer_connection> new_weak_peer(new_peer);
3957  new_peer->accept_or_connect_task_done = fc::async( [this, new_weak_peer]() {
3958  peer_connection_ptr new_peer(new_weak_peer.lock());
3959  assert(new_peer);
3960  if (!new_peer)
3961  return;
3962  accept_connection_task(new_peer);
3963  }, "accept_connection_task" );
3964 
3965  // limit the rate at which we accept connections to mitigate DOS attacks
3967  } FC_CAPTURE_AND_LOG( (0) )
3968  }
3969  } // accept_loop()
3970 
3972  {
3975 
3976  fc::sha256::encoder shared_secret_encoder;
3977  fc::sha512 shared_secret = peer->get_shared_secret();
3978  shared_secret_encoder.write(shared_secret.data(), sizeof(shared_secret));
3980 
3981  // in the hello messsage, we send three things:
3982  // ip address
3983  // outbound port
3984  // inbound port
3985  // The peer we're connecting to will assume we're firewalled if the
3986  // ip address and outbound port we send don't match the values it sees on its remote endpoint
3987  //
3988  // if we know that we're behind a NAT that will allow incoming connections because our firewall
3989  // detection figured it out, send those values instead.
3990 
3991  fc::ip::endpoint local_endpoint(peer->get_socket().local_endpoint());
3993 
3996  {
3997  local_endpoint = *_publicly_visible_listening_endpoint;
3998  listening_port = _publicly_visible_listening_endpoint->port();
3999  }
4000 
4003  local_endpoint.get_address(),
4004  listening_port,
4005  local_endpoint.port(),
4007  signature,
4008  _chain_id,
4010 
4011  peer->send_message(message(hello));
4012  }
4013 
4015  const fc::ip::endpoint& remote_endpoint)
4016  {
4018 
4019  if (!new_peer->performing_firewall_check())
4020  {
4021  // create or find the database entry for the new peer
4022  // if we're connecting to them, we believe they're not firewalled
4023  potential_peer_record updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_endpoint(remote_endpoint);
4025  updated_peer_record.last_connection_attempt_time = fc::time_point::now();;
4026  _potential_peer_db.update_entry(updated_peer_record);
4027  }
4028  else
4029  {
4030  wlog("connecting to peer ${peer} for firewall check", ("peer", new_peer->get_remote_endpoint()));
4031  }
4032 
4033  fc::oexception connect_failed_exception;
4034 
4035  try
4036  {
4037  new_peer->connect_to(remote_endpoint, _actual_listening_endpoint); // blocks until the connection is established and secure connection is negotiated
4038 
4039  // we connected to the peer. guess they're not firewalled....
4040  new_peer->is_firewalled = firewalled_state::not_firewalled;
4041 
4042  // connection succeeded, we've started handshaking. record that in our database
4043  potential_peer_record updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_endpoint(remote_endpoint);
4045  updated_peer_record.number_of_successful_connection_attempts++;
4046  updated_peer_record.last_seen_time = fc::time_point::now();
4047  _potential_peer_db.update_entry(updated_peer_record);
4048  }
4049  catch (const fc::exception& except)
4050  {
4051  connect_failed_exception = except;
4052  }
4053 
4054  if (connect_failed_exception && !new_peer->performing_firewall_check())
4055  {
4056  // connection failed. record that in our database
4057  potential_peer_record updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_endpoint(remote_endpoint);
4059  updated_peer_record.number_of_failed_connection_attempts++;
4060  if (new_peer->connection_closed_error)
4061  updated_peer_record.last_error = *new_peer->connection_closed_error;
4062  else
4063  updated_peer_record.last_error = *connect_failed_exception;
4064  _potential_peer_db.update_entry(updated_peer_record);
4065  }
4066 
4067  if (new_peer->performing_firewall_check())
4068  {
4069  // we were connecting to test whether the node is firewalled, and we now know the result.
4070  // send a message back to the requester
4071  peer_connection_ptr requesting_peer = get_peer_by_node_id(new_peer->firewall_check_state->requesting_peer);
4072  if (requesting_peer)
4073  {
4075  reply.endpoint_checked = new_peer->firewall_check_state->endpoint_to_test;
4076  reply.node_id = new_peer->firewall_check_state->expected_node_id;
4077  reply.result = connect_failed_exception ?
4080  wlog("firewall check of ${peer_checked} ${success_or_failure}, sending reply to ${requester}",
4081  ("peer_checked", new_peer->get_remote_endpoint())
4082  ("success_or_failure", connect_failed_exception ? "failed" : "succeeded" )
4083  ("requester", requesting_peer->get_remote_endpoint()));
4084 
4085  requesting_peer->send_message(reply);
4086  }
4087  }
4088 
4089  if (connect_failed_exception || new_peer->performing_firewall_check())
4090  {
4091  // if the connection failed or if this connection was just intended to check
4092  // whether the peer is firewalled, we want to disconnect now.
4093  _handshaking_connections.erase(new_peer);
4094  _terminating_connections.erase(new_peer);
4095  assert(_active_connections.find(new_peer) == _active_connections.end());
4096  _active_connections.erase(new_peer);
4097  assert(_closing_connections.find(new_peer) == _closing_connections.end());
4098  _closing_connections.erase(new_peer);
4099 
4102  schedule_peer_for_deletion(new_peer);
4103 
4104  if (connect_failed_exception)
4105  throw *connect_failed_exception;
4106  }
4107  else
4108  {
4109  // connection was successful and we want to stay connected
4110  fc::ip::endpoint local_endpoint = new_peer->get_local_endpoint();
4111  new_peer->inbound_address = local_endpoint.get_address();
4113  new_peer->outbound_port = local_endpoint.port();
4114 
4117  send_hello_message(new_peer);
4118  dlog("Sent \"hello\" to peer ${peer}", ("peer", new_peer->get_remote_endpoint()));
4119  }
4120  }
4121 
4122  // methods implementing node's public interface
4123  void node_impl::set_node_delegate(node_delegate* del, fc::thread* thread_for_delegate_calls)
4124  {
4126  _delegate.reset();
4127  if (del)
4128  _delegate.reset(new statistics_gathering_node_delegate_wrapper(del, thread_for_delegate_calls));
4129  if( _delegate )
4130  _chain_id = del->get_chain_id();
4131  }
4132 
4133  void node_impl::load_configuration( const fc::path& configuration_directory )
4134  {
4136  _node_configuration_directory = configuration_directory;
4138  bool node_configuration_loaded = false;
4139  if( fc::exists(configuration_file_name ) )
4140  {
4141  try
4142  {
4144  ilog( "Loaded configuration from file ${filename}", ("filename", configuration_file_name ) );
4145 
4148 
4149  node_configuration_loaded = true;
4150  }
4151  catch ( fc::parse_error_exception& parse_error )
4152  {
4153  elog( "malformed node configuration file ${filename}: ${error}",
4154  ( "filename", configuration_file_name )("error", parse_error.to_detail_string() ) );
4155  }
4156  catch ( fc::exception& except )
4157  {
4158  elog( "unexpected exception while reading configuration file ${filename}: ${error}",
4159  ( "filename", configuration_file_name )("error", except.to_detail_string() ) );
4160  }
4161  }
4162 
4163  if( !node_configuration_loaded )
4164  {
4166 
4167 #ifdef GRAPHENE_TEST_NETWORK
4168  uint32_t port = GRAPHENE_NET_TEST_P2P_PORT;
4169 #else
4170  uint32_t port = GRAPHENE_NET_DEFAULT_P2P_PORT;
4171 #endif
4175 
4176  ilog( "generating new private key for this node" );
4178  }
4179 
4181 
4182  fc::path potential_peer_database_file_name(_node_configuration_directory / POTENTIAL_PEER_DATABASE_FILENAME);
4183  try
4184  {
4185  _potential_peer_db.open(potential_peer_database_file_name);
4186 
4187  // push back the time on all peers loaded from the database so we will be able to retry them immediately
4189  {
4190  potential_peer_record updated_peer_record = *itr;
4191  updated_peer_record.last_connection_attempt_time = std::min<fc::time_point_sec>(updated_peer_record.last_connection_attempt_time,
4193  _potential_peer_db.update_entry(updated_peer_record);
4194  }
4195 
4197  }
4198  catch (fc::exception& except)
4199  {
4200  elog("unable to open peer database ${filename}: ${error}",
4201  ("filename", potential_peer_database_file_name)("error", except.to_detail_string()));
4202  throw;
4203  }
4204  }
4205 
4207  {
4210  {
4211  wlog("accept_incoming_connections is false, p2p network will not accept any incoming connections");
4212  return;
4213  }
4214 
4216 
4218  if( listen_endpoint.port() != 0 )
4219  {
4220  // if the user specified a port, we only want to bind to it if it's not already
4221  // being used by another application. During normal operation, we set the
4222  // SO_REUSEADDR/SO_REUSEPORT flags so that we can bind outbound sockets to the
4223  // same local endpoint as we're listening on here. On some platforms, setting
4224  // those flags will prevent us from detecting that other applications are
4225  // listening on that port. We'd like to detect that, so we'll set up a temporary
4226  // tcp server without that flag to see if we can listen on that port.
4227  bool first = true;
4228  for( ;; )
4229  {
4230  bool listen_failed = false;
4231 
4232  try
4233  {
4234  fc::tcp_server temporary_server;
4235  if( listen_endpoint.get_address() != fc::ip::address() )
4236  temporary_server.listen( listen_endpoint );
4237  else
4238  temporary_server.listen( listen_endpoint.port() );
4239  break;
4240  }
4241  catch ( const fc::exception&)
4242  {
4243  listen_failed = true;
4244  }
4245 
4246  if (listen_failed)
4247  {
4249  {
4250  std::ostringstream error_message_stream;
4251  if( first )
4252  {
4253  error_message_stream << "Unable to listen for connections on port " << listen_endpoint.port()
4254  << ", retrying in a few seconds\n";
4255  error_message_stream << "You can wait for it to become available, or restart this program using\n";
4256  error_message_stream << "the --p2p-endpoint option to specify another port\n";
4257  first = false;
4258  }
4259  else
4260  {
4261  error_message_stream << "\nStill waiting for port " << listen_endpoint.port() << " to become available\n";
4262  }
4263  std::string error_message = error_message_stream.str();
4264  wlog(error_message);
4265  std::cout << "\033[31m" << error_message;
4266  _delegate->error_encountered( error_message, fc::oexception() );
4267  fc::usleep( fc::seconds(5 ) );
4268  }
4269  else // don't wait, just find a random port
4270  {
4271  wlog( "unable to bind on the requested endpoint ${endpoint}, which probably means that endpoint is already in use",
4272  ( "endpoint", listen_endpoint ) );
4273  listen_endpoint.set_port( 0 );
4274  }
4275  } // if (listen_failed)
4276  } // for(;;)
4277  } // if (listen_endpoint.port() != 0)
4278  else // port is 0
4279  {
4280  // if they requested a random port, we'll just assume it's available
4281  // (it may not be due to ip address, but we'll detect that in the next step)
4282  }
4283 
4285  try
4286  {
4287  if( listen_endpoint.get_address() != fc::ip::address() )
4288  _tcp_server.listen( listen_endpoint );
4289  else
4290  _tcp_server.listen( listen_endpoint.port() );
4292  ilog( "listening for connections on endpoint ${endpoint} (our first choice)",
4293  ( "endpoint", _actual_listening_endpoint ) );
4294  }
4295  catch ( fc::exception& e )
4296  {
4297  FC_RETHROW_EXCEPTION( e, error, "unable to listen on ${endpoint}", ("endpoint",listen_endpoint ) );
4298  }
4299  }
4300 
4302  {
4305 
4306  assert(!_accept_loop_complete.valid() &&
4317  _accept_loop_complete = fc::async( [=](){ accept_loop(); }, "accept_loop");
4318  _p2p_network_connect_loop_done = fc::async( [=]() { p2p_network_connect_loop(); }, "p2p_network_connect_loop" );
4319  _fetch_sync_items_loop_done = fc::async( [=]() { fetch_sync_items_loop(); }, "fetch_sync_items_loop" );
4320  _fetch_item_loop_done = fc::async( [=]() { fetch_items_loop(); }, "fetch_items_loop" );
4321  _advertise_inventory_loop_done = fc::async( [=]() { advertise_inventory_loop(); }, "advertise_inventory_loop" );
4322  _terminate_inactive_connections_loop_done = fc::async( [=]() { terminate_inactive_connections_loop(); }, "terminate_inactive_connections_loop" );
4323  _fetch_updated_peer_lists_loop_done = fc::async([=](){ fetch_updated_peer_lists_loop(); }, "fetch_updated_peer_lists_loop");
4324  _bandwidth_monitor_loop_done = fc::async([=](){ bandwidth_monitor_loop(); }, "bandwidth_monitor_loop");
4325  _dump_node_status_task_done = fc::async([=](){ dump_node_status_task(); }, "dump_node_status_task");
4327  }
4328 
4330  {
4332  // if we're connecting to them, we believe they're not firewalled
4334 
4335  // if we've recently connected to this peer, reset the last_connection_attempt_time to allow
4336  // us to immediately retry this peer
4337  updated_peer_record.last_connection_attempt_time = std::min<fc::time_point_sec>(updated_peer_record.last_connection_attempt_time,
4339  _add_once_node_list.push_back(updated_peer_record);
4340  _potential_peer_db.update_entry(updated_peer_record);
4342  }
4343 
4344  void node_impl::add_seed_node(const std::string& endpoint_string)
4345  {
4347  _seed_nodes.insert( endpoint_string );
4348  resolve_seed_node_and_add( endpoint_string );
4349  }
4350 
4351  void node_impl::resolve_seed_node_and_add(const std::string& endpoint_string)
4352  {
4354  std::vector<fc::ip::endpoint> endpoints;
4355  ilog("Resolving seed node ${endpoint}", ("endpoint", endpoint_string));
4356  try
4357  {
4358  endpoints = graphene::net::node::resolve_string_to_ip_endpoints(endpoint_string);
4359  }
4360  catch(...)
4361  {
4362  wlog( "Unable to resolve endpoint during attempt to add seed node ${ep}", ("ep", endpoint_string) );
4363  }
4364  for (const fc::ip::endpoint& endpoint : endpoints)
4365  {
4366  ilog("Adding seed node ${endpoint}", ("endpoint", endpoint));
4367  add_node(endpoint);
4368  }
4369  }
4370 
4372  {
4373  new_peer->get_socket().open();
4374  new_peer->get_socket().set_reuse_address();
4375  new_peer->connection_initiation_time = fc::time_point::now();
4376  _handshaking_connections.insert(new_peer);
4377  _rate_limiter.add_tcp_socket(&new_peer->get_socket());
4378 
4380  return;
4381 
4382  std::weak_ptr<peer_connection> new_weak_peer(new_peer);
4383  new_peer->accept_or_connect_task_done = fc::async([this, new_weak_peer](){
4384  peer_connection_ptr new_peer(new_weak_peer.lock());
4385  assert(new_peer);
4386  if (!new_peer)
4387  return;
4388  connect_to_task(new_peer, *new_peer->get_remote_endpoint());
4389  }, "connect_to_task");
4390  }
4391 
4393  {
4395  if (is_connection_to_endpoint_in_progress(remote_endpoint))
4396  FC_THROW_EXCEPTION(already_connected_to_requested_peer, "already connected to requested endpoint ${endpoint}",
4397  ("endpoint", remote_endpoint));
4398 
4399  dlog("node_impl::connect_to_endpoint(${endpoint})", ("endpoint", remote_endpoint));
4401  new_peer->set_remote_endpoint(remote_endpoint);
4402  initiate_connect_to(new_peer);
4403  }
4404 
4406  {
4408  {
4410  for( const peer_connection_ptr& active_peer : _active_connections )
4411  {
4412  fc::optional<fc::ip::endpoint> endpoint_for_this_peer( active_peer->get_remote_endpoint() );
4413  if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint )
4414  return active_peer;
4415  }
4416  }
4417  {
4419  for( const peer_connection_ptr& handshaking_peer : _handshaking_connections )
4420  {
4421  fc::optional<fc::ip::endpoint> endpoint_for_this_peer( handshaking_peer->get_remote_endpoint() );
4422  if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint )
4423  return handshaking_peer;
4424  }
4425  }
4426  return peer_connection_ptr();
4427  }
4428 
4430  {
4432  return get_connection_to_endpoint( remote_endpoint ) != peer_connection_ptr();
4433  }
4434 
4436  {
4442  }
4443 
4445  {
4447  _active_connections.erase(peer);
4451  }
4452 
4454  {
4456  _active_connections.erase(peer);
4460  }
4461 
4463  {
4465  ilog( "----------------- PEER STATUS UPDATE --------------------" );
4466  ilog( " number of peers: ${active} active, ${handshaking}, ${closing} closing. attempting to maintain ${desired} - ${maximum} peers",
4467  ( "active", _active_connections.size() )("handshaking", _handshaking_connections.size() )("closing",_closing_connections.size() )
4468  ( "desired", _desired_number_of_connections )("maximum", _maximum_number_of_connections ) );
4469  {
4471  for( const peer_connection_ptr& peer : _active_connections )
4472  {
4473  ilog( " active peer ${endpoint} peer_is_in_sync_with_us:${in_sync_with_us} we_are_in_sync_with_peer:${in_sync_with_them}",
4474  ( "endpoint", peer->get_remote_endpoint() )
4475  ( "in_sync_with_us", !peer->peer_needs_sync_items_from_us )("in_sync_with_them", !peer->we_need_sync_items_from_peer ) );
4476  if( peer->we_need_sync_items_from_peer )
4477  ilog( " above peer has ${count} sync items we might need", ("count", peer->ids_of_items_to_get.size() ) );
4478  if (peer->inhibit_fetching_sync_blocks)
4479  ilog( " we are not fetching sync blocks from the above peer (inhibit_fetching_sync_blocks == true)" );
4480 
4481  }
4482  }
4483  {
4485  for( const peer_connection_ptr& peer : _handshaking_connections )
4486  {
4487  ilog( " handshaking peer ${endpoint} in state ours(${our_state}) theirs(${their_state})",
4488  ( "endpoint", peer->get_remote_endpoint() )("our_state", peer->our_state )("their_state", peer->their_state ) );
4489  }
4490  }
4491  ilog( "--------- MEMORY USAGE ------------" );
4492  ilog( "node._active_sync_requests size: ${size}", ("size", _active_sync_requests.size() ) );
4493  ilog( "node._received_sync_items size: ${size}", ("size", _received_sync_items.size() ) );
4494  ilog( "node._new_received_sync_items size: ${size}", ("size", _new_received_sync_items.size() ) );
4495  ilog( "node._items_to_fetch size: ${size}", ("size", _items_to_fetch.size() ) );
4496  ilog( "node._new_inventory size: ${size}", ("size", _new_inventory.size() ) );
4497  ilog( "node._message_cache size: ${size}", ("size", _message_cache.size() ) );
4499  for( const peer_connection_ptr& peer : _active_connections )
4500  {
4501  ilog( " peer ${endpoint}", ("endpoint", peer->get_remote_endpoint() ) );
4502  ilog( " peer.ids_of_items_to_get size: ${size}", ("size", peer->ids_of_items_to_get.size() ) );
4503  ilog( " peer.inventory_peer_advertised_to_us size: ${size}", ("size", peer->inventory_peer_advertised_to_us.size() ) );
4504  ilog( " peer.inventory_advertised_to_peer size: ${size}", ("size", peer->inventory_advertised_to_peer.size() ) );
4505  ilog( " peer.items_requested_from_peer size: ${size}", ("size", peer->items_requested_from_peer.size() ) );
4506  ilog( " peer.sync_items_requested_from_peer size: ${size}", ("size", peer->sync_items_requested_from_peer.size() ) );
4507  }
4508  ilog( "--------- END MEMORY USAGE ------------" );
4509  }
4510 
4512  const std::string& reason_for_disconnect,
4513  bool caused_by_error /* = false */,
4514  const fc::oexception& error /* = fc::oexception() */ )
4515  {
4517  move_peer_to_closing_list(peer_to_disconnect->shared_from_this());
4518 
4519  if (peer_to_disconnect->they_have_requested_close)
4520  {
4521  // the peer has already told us that it's ready to close the connection, so just close the connection
4522  peer_to_disconnect->close_connection();
4523  }
4524  else
4525  {
4526  // we're the first to try to want to close the connection
4527  fc::optional<fc::ip::endpoint> inbound_endpoint = peer_to_disconnect->get_endpoint_for_connecting();
4528  if (inbound_endpoint)
4529  {
4530  fc::optional<potential_peer_record> updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*inbound_endpoint);
4531  if (updated_peer_record)
4532  {
4533  updated_peer_record->last_seen_time = fc::time_point::now();
4534  if (error)
4535  updated_peer_record->last_error = error;
4536  else
4537  updated_peer_record->last_error = fc::exception(FC_LOG_MESSAGE(info, reason_for_disconnect.c_str()));
4538  _potential_peer_db.update_entry(*updated_peer_record);
4539  }
4540  }
4541  peer_to_disconnect->we_have_requested_close = true;
4542  peer_to_disconnect->connection_closed_time = fc::time_point::now();
4543 
4544  closing_connection_message closing_message( reason_for_disconnect, caused_by_error, error );
4545  peer_to_disconnect->send_message( closing_message );
4546  }
4547 
4548  // notify the user. This will be useful in testing, but we might want to remove it later.
4549  // It makes good sense to notify the user if other nodes think she is behaving badly, but
4550  // if we're just detecting and dissconnecting other badly-behaving nodes, they don't really care.
4551  if (caused_by_error)
4552  {
4553  std::ostringstream error_message;
4554  error_message << "I am disconnecting peer " << fc::variant( peer_to_disconnect->get_remote_endpoint(), GRAPHENE_NET_MAX_NESTED_OBJECTS ).as_string() <<
4555  " for reason: " << reason_for_disconnect;
4556  _delegate->error_encountered(error_message.str(), fc::oexception());
4557  dlog(error_message.str());
4558  }
4559  else
4560  dlog("Disconnecting from ${peer} for ${reason}", ("peer",peer_to_disconnect->get_remote_endpoint()) ("reason",reason_for_disconnect));
4561  }
4562 
4563  void node_impl::listen_on_endpoint( const fc::ip::endpoint& ep, bool wait_if_not_available )
4564  {
4567  _node_configuration.wait_if_endpoint_is_busy = wait_if_not_available;
4569  }
4570 
4572  {
4576  }
4577 
4578  void node_impl::listen_on_port( uint16_t port, bool wait_if_not_available )
4579  {
4582  _node_configuration.wait_if_endpoint_is_busy = wait_if_not_available;
4584  }
4585 
4587  {
4590  }
4591 
4592  std::vector<peer_status> node_impl::get_connected_peers() const
4593  {
4595  std::vector<peer_status> statuses;
4597  for (const peer_connection_ptr& peer : _active_connections)
4598  {
4599  peer_status this_peer_status;
4600  this_peer_status.version = 0;
4601  fc::optional<fc::ip::endpoint> endpoint = peer->get_remote_endpoint();
4602  if (endpoint)
4603  this_peer_status.host = *endpoint;
4604  fc::mutable_variant_object peer_details;
4605  peer_details["addr"] = endpoint ? (std::string)*endpoint : std::string();
4606  peer_details["addrlocal"] = (std::string)peer->get_local_endpoint();
4607  peer_details["services"] = "00000001";
4608  peer_details["lastsend"] = peer->get_last_message_sent_time().sec_since_epoch();
4609  peer_details["lastrecv"] = peer->get_last_message_received_time().sec_since_epoch();
4610  peer_details["bytessent"] = peer->get_total_bytes_sent();
4611  peer_details["bytesrecv"] = peer->get_total_bytes_received();
4612  peer_details["conntime"] = peer->get_connection_time();
4613  peer_details["pingtime"] = "";
4614  peer_details["pingwait"] = "";
4615  peer_details["version"] = "";
4616  peer_details["subver"] = peer->user_agent;
4617  peer_details["inbound"] = peer->direction == peer_connection_direction::inbound;
4618  peer_details["firewall_status"] = fc::variant( peer->is_firewalled, 1 );
4619  peer_details["startingheight"] = "";
4620  peer_details["banscore"] = "";
4621  peer_details["syncnode"] = "";
4622 
4623  if (peer->fc_git_revision_sha)
4624  {
4625  std::string revision_string = *peer->fc_git_revision_sha;
4626  if (*peer->fc_git_revision_sha == fc::git_revision_sha)
4627  revision_string += " (same as ours)";
4628  else
4629  revision_string += " (different from ours)";
4630  peer_details["fc_git_revision_sha"] = revision_string;
4631 
4632  }
4633  if (peer->fc_git_revision_unix_timestamp)
4634  {
4635  peer_details["fc_git_revision_unix_timestamp"] = *peer->fc_git_revision_unix_timestamp;
4636  std::string age_string = fc::get_approximate_relative_time_string( *peer->fc_git_revision_unix_timestamp);
4637  if (*peer->fc_git_revision_unix_timestamp == fc::time_point_sec(fc::git_revision_unix_timestamp))
4638  age_string += " (same as ours)";
4639  else if (*peer->fc_git_revision_unix_timestamp > fc::time_point_sec(fc::git_revision_unix_timestamp))
4640  age_string += " (newer than ours)";
4641  else
4642  age_string += " (older than ours)";
4643  peer_details["fc_git_revision_age"] = age_string;
4644  }
4645 
4646  if (peer->platform)
4647  peer_details["platform"] = *peer->platform;
4648 
4649  // provide these for debugging
4650  // warning: these are just approximations, if the peer is "downstream" of us, they may
4651  // have received blocks from other peers that we are unaware of
4652  peer_details["current_head_block"] = fc::variant( peer->last_block_delegate_has_seen, 1 );
4653  peer_details["current_head_block_number"] = _delegate->get_block_number(peer->last_block_delegate_has_seen);
4654  peer_details["current_head_block_time"] = peer->last_block_time_delegate_has_seen;
4655 
4656  this_peer_status.info = peer_details;
4657  statuses.push_back(this_peer_status);
4658  }
4659  return statuses;
4660  }
4661 
4663  {
4665  return (uint32_t)_active_connections.size();
4666  }
4667 
4668  void node_impl::broadcast( const message& item_to_broadcast, const message_propagation_data& propagation_data )
4669  {
4671  fc::uint160_t hash_of_message_contents;
4672  if( item_to_broadcast.msg_type.value() == graphene::net::block_message_type )
4673  {
4674  graphene::net::block_message block_message_to_broadcast = item_to_broadcast.as<graphene::net::block_message>();
4675  hash_of_message_contents = block_message_to_broadcast.block_id; // for debugging
4676  _most_recent_blocks_accepted.push_back( block_message_to_broadcast.block_id );
4677  }
4678  else if( item_to_broadcast.msg_type.value() == graphene::net::trx_message_type )
4679  {
4680  graphene::net::trx_message transaction_message_to_broadcast = item_to_broadcast.as<graphene::net::trx_message>();
4681  hash_of_message_contents = transaction_message_to_broadcast.trx.id(); // for debugging
4682  dlog( "broadcasting trx: ${trx}", ("trx", transaction_message_to_broadcast) );
4683  }
4684  message_hash_type hash_of_item_to_broadcast = item_to_broadcast.id();
4685 
4686  _message_cache.cache_message( item_to_broadcast, hash_of_item_to_broadcast, propagation_data, hash_of_message_contents );
4687  _new_inventory.insert( item_id(item_to_broadcast.msg_type.value(), hash_of_item_to_broadcast ) );
4689  }
4690 
4691  void node_impl::broadcast( const message& item_to_broadcast )
4692  {
4694  // this version is called directly from the client
4696  broadcast( item_to_broadcast, propagation_data );
4697  }
4698 
4699  void node_impl::sync_from(const item_id& current_head_block, const std::vector<uint32_t>& hard_fork_block_numbers)
4700  {
4703  _sync_item_type = current_head_block.item_type;
4704  _most_recent_blocks_accepted.push_back(current_head_block.item_hash);
4705  _hard_fork_block_numbers = hard_fork_block_numbers;
4706  }
4707 
4709  {
4711  return !_active_connections.empty();
4712  }
4713 
4714  std::vector<potential_peer_record> node_impl::get_potential_peers() const
4715  {
4717  std::vector<potential_peer_record> result;
4718  // use explicit iterators here, for some reason the mac compiler can't used ranged-based for loops here
4720  result.push_back(*itr);
4721  return result;
4722  }
4723 
4725  {
4727  if (params.contains("peer_connection_retry_timeout"))
4728  _peer_connection_retry_timeout = params["peer_connection_retry_timeout"].as<uint32_t>(1);
4729  if (params.contains("desired_number_of_connections"))
4730  _desired_number_of_connections = params["desired_number_of_connections"].as<uint32_t>(1);
4731  if (params.contains("maximum_number_of_connections"))
4732  _maximum_number_of_connections = params["maximum_number_of_connections"].as<uint32_t>(1);
4733  if (params.contains("maximum_number_of_blocks_to_handle_at_one_time"))
4734  _maximum_number_of_blocks_to_handle_at_one_time = params["maximum_number_of_blocks_to_handle_at_one_time"].as<uint32_t>(1);
4735  if (params.contains("maximum_number_of_sync_blocks_to_prefetch"))
4736  _maximum_number_of_sync_blocks_to_prefetch = params["maximum_number_of_sync_blocks_to_prefetch"].as<uint32_t>(1);
4737  if (params.contains("maximum_blocks_per_peer_during_syncing"))
4738  _maximum_blocks_per_peer_during_syncing = params["maximum_blocks_per_peer_during_syncing"].as<uint32_t>(1);
4739 
4741 
4744  "I have too many connections open");
4746  }
4747 
4749  {
4752  result["peer_connection_retry_timeout"] = _peer_connection_retry_timeout;
4753  result["desired_number_of_connections"] = _desired_number_of_connections;
4754  result["maximum_number_of_connections"] = _maximum_number_of_connections;
4755  result["maximum_number_of_blocks_to_handle_at_one_time"] = _maximum_number_of_blocks_to_handle_at_one_time;
4756  result["maximum_number_of_sync_blocks_to_prefetch"] = _maximum_number_of_sync_blocks_to_prefetch;
4757  result["maximum_blocks_per_peer_during_syncing"] = _maximum_blocks_per_peer_during_syncing;
4758  return result;
4759  }
4760 
4762  {
4764  return _message_cache.get_message_propagation_data( transaction_id );
4765  }
4766 
4768  {
4770  return _message_cache.get_message_propagation_data( block_id );
4771  }
4772 
4774  {
4776  return _node_id;
4777  }
4778  void node_impl::set_allowed_peers(const std::vector<node_id_t>& allowed_peers)
4779  {
4781 #ifdef ENABLE_P2P_DEBUGGING_API
4782  _allowed_peers.clear();
4783  _allowed_peers.insert(allowed_peers.begin(), allowed_peers.end());
4784  std::list<peer_connection_ptr> peers_to_disconnect;
4785  if (!_allowed_peers.empty())
4786  {
4788  for (const peer_connection_ptr& peer : _active_connections)
4789  if (_allowed_peers.find(peer->node_id) == _allowed_peers.end())
4790  peers_to_disconnect.push_back(peer);
4791  }
4792