BitShares-Core  4.0.0
BitShares blockchain implementation and command-line interface software
node.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 Cryptonomex, Inc., and contributors.
3  *
4  * The MIT License
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  */
24 #include <sstream>
25 #include <iomanip>
26 #include <deque>
27 #include <unordered_set>
28 #include <list>
29 #include <forward_list>
30 #include <iostream>
31 #include <algorithm>
32 #include <tuple>
33 #include <string>
34 #include <boost/tuple/tuple.hpp>
35 #include <boost/circular_buffer.hpp>
36 
37 #include <boost/multi_index_container.hpp>
38 #include <boost/multi_index/ordered_index.hpp>
39 #include <boost/multi_index/mem_fun.hpp>
40 #include <boost/multi_index/member.hpp>
41 #include <boost/multi_index/random_access_index.hpp>
42 #include <boost/multi_index/tag.hpp>
43 #include <boost/multi_index/sequenced_index.hpp>
44 #include <boost/multi_index/hashed_index.hpp>
45 #include <boost/logic/tribool.hpp>
46 #include <boost/range/algorithm_ext/push_back.hpp>
47 #include <boost/range/algorithm/find.hpp>
48 #include <boost/range/numeric.hpp>
49 
50 #include <boost/accumulators/accumulators.hpp>
51 #include <boost/accumulators/statistics/stats.hpp>
52 #include <boost/accumulators/statistics/rolling_mean.hpp>
53 #include <boost/accumulators/statistics/min.hpp>
54 #include <boost/accumulators/statistics/max.hpp>
55 #include <boost/accumulators/statistics/sum.hpp>
56 #include <boost/accumulators/statistics/count.hpp>
57 
58 #include <boost/preprocessor/seq/for_each.hpp>
59 #include <boost/preprocessor/cat.hpp>
60 #include <boost/preprocessor/stringize.hpp>
61 
62 #include <fc/thread/thread.hpp>
63 #include <fc/thread/future.hpp>
65 #include <fc/thread/mutex.hpp>
67 #include <fc/log/logger.hpp>
68 #include <fc/io/json.hpp>
69 #include <fc/io/enum_type.hpp>
70 #include <fc/io/raw.hpp>
71 #include <fc/crypto/rand.hpp>
73 #include <fc/network/ip.hpp>
74 #include <fc/network/resolve.hpp>
75 
76 #include <graphene/net/node.hpp>
80 #include <graphene/net/config.hpp>
82 
85 // Nasty hack: A circular dependency around fee_schedule is resolved by fwd-declaring it and using a shared_ptr
86 // to it in chain_parameters, which is used in an operation and thus must be serialized by the net library.
87 // Resolving that forward declaration doesn't happen until now:
89 
90 #include <fc/git_revision.hpp>
91 
92 //#define ENABLE_DEBUG_ULOGS
93 
94 #ifdef DEFAULT_LOGGER
95 # undef DEFAULT_LOGGER
96 #endif
97 #define DEFAULT_LOGGER "p2p"
98 
99 #define P2P_IN_DEDICATED_THREAD 1
100 
101 #define INVOCATION_COUNTER(name) \
102  static unsigned total_ ## name ## _counter = 0; \
103  static unsigned active_ ## name ## _counter = 0; \
104  struct name ## _invocation_logger { \
105  unsigned *total; \
106  unsigned *active; \
107  name ## _invocation_logger(unsigned *total, unsigned *active) : \
108  total(total), active(active) \
109  { \
110  ++*total; \
111  ++*active; \
112  dlog("NEWDEBUG: Entering " #name ", now ${total} total calls, ${active} active calls", ("total", *total)("active", *active)); \
113  } \
114  ~name ## _invocation_logger() \
115  { \
116  --*active; \
117  dlog("NEWDEBUG: Leaving " #name ", now ${total} total calls, ${active} active calls", ("total", *total)("active", *active)); \
118  } \
119  } invocation_logger(&total_ ## name ## _counter, &active_ ## name ## _counter)
120 
121 //log these messages even at warn level when operating on the test network
122 #ifdef GRAPHENE_TEST_NETWORK
123 #define testnetlog wlog
124 #else
125 #define testnetlog(...) do {} while (0)
126 #endif
127 
128 namespace graphene { namespace net {
129 
130  namespace detail
131  {
132  namespace bmi = boost::multi_index;
134  {
135  private:
136  static const uint32_t cache_duration_in_blocks = GRAPHENE_NET_MESSAGE_CACHE_DURATION_IN_BLOCKS;
137 
138  struct message_hash_index{};
139  struct message_contents_hash_index{};
140  struct block_clock_index{};
141  struct message_info
142  {
143  message_hash_type message_hash;
144  message message_body;
145  uint32_t block_clock_when_received;
146 
147  // for network performance stats
148  message_propagation_data propagation_data;
149  fc::uint160_t message_contents_hash; // hash of whatever the message contains (if it's a transaction, this is the transaction id, if it's a block, it's the block_id)
150 
151  message_info( const message_hash_type& message_hash,
152  const message& message_body,
153  uint32_t block_clock_when_received,
154  const message_propagation_data& propagation_data,
155  fc::uint160_t message_contents_hash ) :
156  message_hash( message_hash ),
157  message_body( message_body ),
158  block_clock_when_received( block_clock_when_received ),
159  propagation_data( propagation_data ),
160  message_contents_hash( message_contents_hash )
161  {}
162  };
163  typedef boost::multi_index_container
164  < message_info,
165  bmi::indexed_by< bmi::ordered_unique< bmi::tag<message_hash_index>,
166  bmi::member<message_info, message_hash_type, &message_info::message_hash> >,
167  bmi::ordered_non_unique< bmi::tag<message_contents_hash_index>,
168  bmi::member<message_info, fc::uint160_t, &message_info::message_contents_hash> >,
169  bmi::ordered_non_unique< bmi::tag<block_clock_index>,
170  bmi::member<message_info, uint32_t, &message_info::block_clock_when_received> > >
171  > message_cache_container;
172 
173  message_cache_container _message_cache;
174 
175  uint32_t block_clock;
176 
177  public:
179  block_clock( 0 )
180  {}
181  void block_accepted();
182  void cache_message( const message& message_to_cache, const message_hash_type& hash_of_message_to_cache,
183  const message_propagation_data& propagation_data, const fc::uint160_t& message_content_hash );
184  message get_message( const message_hash_type& hash_of_message_to_lookup );
185  message_propagation_data get_message_propagation_data( const fc::uint160_t& hash_of_message_contents_to_lookup ) const;
186  size_t size() const { return _message_cache.size(); }
187  };
188 
190  {
191  ++block_clock;
192  if( block_clock > cache_duration_in_blocks )
193  _message_cache.get<block_clock_index>().erase(_message_cache.get<block_clock_index>().begin(),
194  _message_cache.get<block_clock_index>().lower_bound(block_clock - cache_duration_in_blocks ) );
195  }
196 
198  const message_hash_type& hash_of_message_to_cache,
199  const message_propagation_data& propagation_data,
200  const fc::uint160_t& message_content_hash )
201  {
202  _message_cache.insert( message_info(hash_of_message_to_cache,
203  message_to_cache,
204  block_clock,
205  propagation_data,
206  message_content_hash ) );
207  }
208 
210  {
211  message_cache_container::index<message_hash_index>::type::const_iterator iter =
212  _message_cache.get<message_hash_index>().find(hash_of_message_to_lookup );
213  if( iter != _message_cache.get<message_hash_index>().end() )
214  return iter->message_body;
215  FC_THROW_EXCEPTION( fc::key_not_found_exception, "Requested message not in cache" );
216  }
217 
219  {
220  if( hash_of_message_contents_to_lookup != fc::uint160_t() )
221  {
222  message_cache_container::index<message_contents_hash_index>::type::const_iterator iter =
223  _message_cache.get<message_contents_hash_index>().find(hash_of_message_contents_to_lookup );
224  if( iter != _message_cache.get<message_contents_hash_index>().end() )
225  return iter->propagation_data;
226  }
227  FC_THROW_EXCEPTION( fc::key_not_found_exception, "Requested message not in cache" );
228  }
229 
231 
232  // This specifies configuration info for the local node. It's stored as JSON
233  // in the configuration directory (application data directory)
235  {
236  node_configuration() : accept_incoming_connections(true), wait_if_endpoint_is_busy(true) {}
237 
248  };
249 
250 
251 } } } // end namespace graphene::net::detail
253  (accept_incoming_connections)
255  (private_key));
256 
257 #include "node_impl.hxx"
258 
259 namespace graphene { namespace net { namespace detail {
260 
262  {
263 #ifdef P2P_IN_DEDICATED_THREAD
264  std::weak_ptr<fc::thread> weak_thread;
265  if (impl_to_delete)
266  {
267  std::shared_ptr<fc::thread> impl_thread(impl_to_delete->_thread);
268  weak_thread = impl_thread;
269  impl_thread->async([impl_to_delete](){ delete impl_to_delete; }, "delete node_impl").wait();
270  dlog("deleting the p2p thread");
271  }
272  if (weak_thread.expired())
273  dlog("done deleting the p2p thread");
274  else
275  dlog("failed to delete the p2p thread, we must be leaking a smart pointer somewhere");
276 #else // P2P_IN_DEDICATED_THREAD
277  delete impl_to_delete;
278 #endif // P2P_IN_DEDICATED_THREAD
279  }
280 
281 #ifdef P2P_IN_DEDICATED_THREAD
282 # define VERIFY_CORRECT_THREAD() assert(_thread->is_current())
283 #else
284 # define VERIFY_CORRECT_THREAD() do {} while (0)
285 #endif
286 
287 #define MAXIMUM_NUMBER_OF_BLOCKS_TO_HANDLE_AT_ONE_TIME 200
288 #define MAXIMUM_NUMBER_OF_BLOCKS_TO_PREFETCH (10 * MAXIMUM_NUMBER_OF_BLOCKS_TO_HANDLE_AT_ONE_TIME)
289 
290  node_impl::node_impl(const std::string& user_agent) :
292  _thread(std::make_shared<fc::thread>("p2p")),
293 #endif // P2P_IN_DEDICATED_THREAD
294  _delegate(nullptr),
295  _is_firewalled(firewalled_state::unknown),
296  _potential_peer_database_updated(false),
297  _sync_items_to_fetch_updated(false),
298  _suspend_fetching_sync_blocks(false),
299  _items_to_fetch_updated(false),
300  _items_to_fetch_sequence_counter(0),
301  _recent_block_interval_in_seconds(GRAPHENE_MAX_BLOCK_INTERVAL),
302  _user_agent_string(user_agent),
303  _desired_number_of_connections(GRAPHENE_NET_DEFAULT_DESIRED_CONNECTIONS),
304  _maximum_number_of_connections(GRAPHENE_NET_DEFAULT_MAX_CONNECTIONS),
305  _peer_connection_retry_timeout(GRAPHENE_NET_DEFAULT_PEER_CONNECTION_RETRY_TIME),
306  _peer_inactivity_timeout(GRAPHENE_NET_PEER_HANDSHAKE_INACTIVITY_TIMEOUT),
307  _most_recent_blocks_accepted(_maximum_number_of_connections),
308  _total_number_of_unfetched_items(0),
309  _rate_limiter(0, 0),
310  _last_reported_number_of_connections(0),
311  _peer_advertising_disabled(false),
312  _average_network_read_speed_seconds(60),
313  _average_network_write_speed_seconds(60),
314  _average_network_read_speed_minutes(60),
315  _average_network_write_speed_minutes(60),
316  _average_network_read_speed_hours(72),
317  _average_network_write_speed_hours(72),
318  _average_network_usage_second_counter(0),
319  _average_network_usage_minute_counter(0),
320  _node_is_shutting_down(false),
321  _maximum_number_of_blocks_to_handle_at_one_time(MAXIMUM_NUMBER_OF_BLOCKS_TO_HANDLE_AT_ONE_TIME),
322  _maximum_number_of_sync_blocks_to_prefetch(MAXIMUM_NUMBER_OF_BLOCKS_TO_PREFETCH),
323  _maximum_blocks_per_peer_during_syncing(GRAPHENE_NET_MAX_BLOCKS_PER_PEER_DURING_SYNCING)
324  {
326  fc::rand_bytes((char*) _node_id.data(), (int)_node_id.size());
327  }
328 
330  {
332  ilog( "cleaning up node" );
333  _node_is_shutting_down = true;
334 
335  for (const peer_connection_ptr& active_peer : _active_connections)
336  {
337  fc::optional<fc::ip::endpoint> inbound_endpoint = active_peer->get_endpoint_for_connecting();
338  if (inbound_endpoint)
339  {
341  if (updated_peer_record)
342  {
343  updated_peer_record->last_seen_time = fc::time_point::now();
344  _potential_peer_db.update_entry(*updated_peer_record);
345  }
346  }
347  }
348 
349  try
350  {
351  ilog( "close" );
352  close();
353  }
354  catch ( const fc::exception& e )
355  {
356  wlog( "unexpected exception on close ${e}", ("e", e) );
357  }
358  ilog( "done" );
359  }
360 
362  {
365  {
367  try
368  {
369  fc::json::save_to_file( _node_configuration, configuration_file_name );
370  }
371  catch (const fc::canceled_exception&)
372  {
373  throw;
374  }
375  catch ( const fc::exception& except )
376  {
377  elog( "error writing node configuration to file ${filename}: ${error}",
378  ( "filename", configuration_file_name )("error", except.to_detail_string() ) );
379  }
380  }
381  }
382 
384  {
387  {
388  try
389  {
390  dlog("Starting an iteration of p2p_network_connect_loop().");
392 
393  // add-once peers bypass our checks on the maximum/desired number of connections (but they will still be counted against the totals once they're connected)
394  if (!_add_once_node_list.empty())
395  {
396  std::list<potential_peer_record> add_once_node_list;
397  add_once_node_list.swap(_add_once_node_list);
398  dlog("Processing \"add once\" node list containing ${count} peers:", ("count", add_once_node_list.size()));
399  for (const potential_peer_record& add_once_peer : add_once_node_list)
400  {
401  dlog(" ${peer}", ("peer", add_once_peer.endpoint));
402  }
403  for (const potential_peer_record& add_once_peer : add_once_node_list)
404  {
405  // see if we have an existing connection to that peer. If we do, disconnect them and
406  // then try to connect the next time through the loop
407  peer_connection_ptr existing_connection_ptr = get_connection_to_endpoint( add_once_peer.endpoint );
408  if(!existing_connection_ptr)
409  connect_to_endpoint(add_once_peer.endpoint);
410  }
411  dlog("Done processing \"add once\" node list");
412  }
413 
415  {
416  bool initiated_connection_this_pass = false;
418 
421  ++iter)
422  {
423  fc::microseconds delay_until_retry = fc::seconds((iter->number_of_failed_connection_attempts + 1) * _peer_connection_retry_timeout);
424 
425  if (!is_connection_to_endpoint_in_progress(iter->endpoint) &&
426  ((iter->last_connection_disposition != last_connection_failed &&
427  iter->last_connection_disposition != last_connection_rejected &&
428  iter->last_connection_disposition != last_connection_handshaking_failed) ||
429  (fc::time_point::now() - iter->last_connection_attempt_time) > delay_until_retry))
430  {
431  connect_to_endpoint(iter->endpoint);
432  initiated_connection_this_pass = true;
433  }
434  }
435 
436  if (!initiated_connection_this_pass && !_potential_peer_database_updated)
437  break;
438  }
439 
441 
442  // if we broke out of the while loop, that means either we have connected to enough nodes, or
443  // we don't have any good candidates to connect to right now.
444 #if 0
445  try
446  {
447  _retrigger_connect_loop_promise = fc::promise<void>::create("graphene::net::retrigger_connect_loop");
449  {
451  dlog( "Still want to connect to more nodes, but I don't have any good candidates. Trying again in 15 seconds" );
452  else
453  dlog( "I still have some \"add once\" nodes to connect to. Trying again in 15 seconds" );
455  }
456  else
457  {
458  dlog( "I don't need any more connections, waiting forever until something changes" );
460  }
461  }
462  catch ( fc::timeout_exception& ) //intentionally not logged
463  {
464  } // catch
465 #else
466  fc::usleep(fc::seconds(10));
467 #endif
468  }
469  catch (const fc::canceled_exception&)
470  {
471  throw;
472  }
473  FC_CAPTURE_AND_LOG( (0) )
474  }// while(!canceled)
475  }
476 
478  {
480  dlog( "Triggering connect loop now" );
482  //if( _retrigger_connect_loop_promise )
483  // _retrigger_connect_loop_promise->set_value();
484  }
485 
487  {
489 
490  try
491  {
492  dlog("Starting an iteration of update_seed_nodes loop.");
493  for( const std::string& endpoint_string : _seed_nodes )
494  {
495  resolve_seed_node_and_add( endpoint_string );
496  }
497  dlog("Done an iteration of update_seed_nodes loop.");
498  }
499  catch (const fc::canceled_exception&)
500  {
501  throw;
502  }
504 
506  }
507 
509  {
511 
513  return;
514 
516  return;
517 
520  "update_seed_nodes_loop" );
521  }
522 
524  {
526  return std::find_if(_received_sync_items.begin(), _received_sync_items.end(),
527  [&item_hash]( const graphene::net::block_message& message ) { return message.block_id == item_hash; } ) != _received_sync_items.end() ||
528  std::find_if(_new_received_sync_items.begin(), _new_received_sync_items.end(),
529  [&item_hash]( const graphene::net::block_message& message ) { return message.block_id == item_hash; } ) != _new_received_sync_items.end(); ;
530  }
531 
532  void node_impl::request_sync_item_from_peer( const peer_connection_ptr& peer, const item_hash_t& item_to_request )
533  {
535  dlog( "requesting item ${item_hash} from peer ${endpoint}", ("item_hash", item_to_request )("endpoint", peer->get_remote_endpoint() ) );
536  item_id item_id_to_request( graphene::net::block_message_type, item_to_request );
537  _active_sync_requests.insert( active_sync_requests_map::value_type(item_to_request, fc::time_point::now() ) );
538  peer->last_sync_item_received_time = fc::time_point::now();
539  peer->sync_items_requested_from_peer.insert(item_to_request);
540  peer->send_message( fetch_items_message(item_id_to_request.item_type, std::vector<item_hash_t>{item_id_to_request.item_hash} ) );
541  }
542 
543  void node_impl::request_sync_items_from_peer( const peer_connection_ptr& peer, const std::vector<item_hash_t>& items_to_request )
544  {
546  dlog( "requesting ${item_count} item(s) ${items_to_request} from peer ${endpoint}",
547  ("item_count", items_to_request.size())("items_to_request", items_to_request)("endpoint", peer->get_remote_endpoint()) );
548  for (const item_hash_t& item_to_request : items_to_request)
549  {
550  _active_sync_requests.insert( active_sync_requests_map::value_type(item_to_request, fc::time_point::now() ) );
551  peer->last_sync_item_received_time = fc::time_point::now();
552  peer->sync_items_requested_from_peer.insert(item_to_request);
553  }
554  peer->send_message(fetch_items_message(graphene::net::block_message_type, items_to_request));
555  }
556 
558  {
561  {
563  dlog( "beginning another iteration of the sync items loop" );
564 
566  {
567  std::map<peer_connection_ptr, std::vector<item_hash_t> > sync_item_requests_to_send;
568 
569  {
571  std::set<item_hash_t> sync_items_to_request;
572 
573  // for each idle peer that we're syncing with
574  for( const peer_connection_ptr& peer : _active_connections )
575  {
576  if( peer->we_need_sync_items_from_peer &&
577  sync_item_requests_to_send.find(peer) == sync_item_requests_to_send.end() && // if we've already scheduled a request for this peer, don't consider scheduling another
578  peer->idle() )
579  {
580  if (!peer->inhibit_fetching_sync_blocks)
581  {
582  // loop through the items it has that we don't yet have on our blockchain
583  for( unsigned i = 0; i < peer->ids_of_items_to_get.size(); ++i )
584  {
585  item_hash_t item_to_potentially_request = peer->ids_of_items_to_get[i];
586  // if we don't already have this item in our temporary storage and we haven't requested from another syncing peer
587  if( !have_already_received_sync_item(item_to_potentially_request) && // already got it, but for some reson it's still in our list of items to fetch
588  sync_items_to_request.find(item_to_potentially_request) == sync_items_to_request.end() && // we have already decided to request it from another peer during this iteration
589  _active_sync_requests.find(item_to_potentially_request) == _active_sync_requests.end() ) // we've requested it in a previous iteration and we're still waiting for it to arrive
590  {
591  // then schedule a request from this peer
592  sync_item_requests_to_send[peer].push_back(item_to_potentially_request);
593  sync_items_to_request.insert( item_to_potentially_request );
594  if (sync_item_requests_to_send[peer].size() >= _maximum_blocks_per_peer_during_syncing)
595  break;
596  }
597  }
598  }
599  }
600  }
601  } // end non-preemptable section
602 
603  // make all the requests we scheduled in the loop above
604  for( auto sync_item_request : sync_item_requests_to_send )
605  request_sync_items_from_peer( sync_item_request.first, sync_item_request.second );
606  sync_item_requests_to_send.clear();
607  }
608  else
609  dlog("fetch_sync_items_loop is suspended pending backlog processing");
610 
612  {
613  dlog( "no sync items to fetch right now, going to sleep" );
614  _retrigger_fetch_sync_items_loop_promise = fc::promise<void>::create("graphene::net::retrigger_fetch_sync_items_loop");
617  }
618  } // while( !canceled )
619  }
620 
622  {
624  dlog( "Triggering fetch sync items loop now" );
628  }
629 
631  {
632  for( const peer_connection_ptr& peer : _active_connections )
633  {
634  if (peer->inventory_peer_advertised_to_us.find(item) != peer->inventory_peer_advertised_to_us.end() )
635  return true;
636  }
637  return false;
638  }
639 
641  {
644  {
645  _items_to_fetch_updated = false;
646  dlog("beginning an iteration of fetch items (${count} items to fetch)",
647  ("count", _items_to_fetch.size()));
648 
650  fc::time_point next_peer_unblocked_time = fc::time_point::maximum();
651 
652  // we need to construct a list of items to request from each peer first,
653  // then send the messages (in two steps, to avoid yielding while iterating)
654  // we want to evenly distribute our requests among our peers.
655  struct requested_item_count_index {};
656  struct peer_and_items_to_fetch
657  {
658  peer_connection_ptr peer;
659  std::vector<item_id> item_ids;
660  peer_and_items_to_fetch(const peer_connection_ptr& peer) : peer(peer) {}
661  bool operator<(const peer_and_items_to_fetch& rhs) const { return peer < rhs.peer; }
662  size_t number_of_items() const { return item_ids.size(); }
663  };
664  typedef boost::multi_index_container<peer_and_items_to_fetch,
665  boost::multi_index::indexed_by<boost::multi_index::ordered_unique<boost::multi_index::member<peer_and_items_to_fetch, peer_connection_ptr, &peer_and_items_to_fetch::peer> >,
666  boost::multi_index::ordered_non_unique<boost::multi_index::tag<requested_item_count_index>,
667  boost::multi_index::const_mem_fun<peer_and_items_to_fetch, size_t, &peer_and_items_to_fetch::number_of_items> > > > fetch_messages_to_send_set;
668  fetch_messages_to_send_set items_by_peer;
669 
670  // initialize the fetch_messages_to_send with an empty set of items for all idle peers
671  for (const peer_connection_ptr& peer : _active_connections)
672  if (peer->idle())
673  items_by_peer.insert(peer_and_items_to_fetch(peer));
674 
675  // now loop over all items we want to fetch
676  for (auto item_iter = _items_to_fetch.begin(); item_iter != _items_to_fetch.end();)
677  {
678  if (item_iter->timestamp < oldest_timestamp_to_fetch)
679  {
680  // this item has probably already fallen out of our peers' caches, we'll just ignore it.
681  // this can happen during flooding, and the _items_to_fetch could otherwise get clogged
682  // with a bunch of items that we'll never be able to request from any peer
683  wlog("Unable to fetch item ${item} before its likely expiration time, removing it from our list of items to fetch", ("item", item_iter->item));
684  item_iter = _items_to_fetch.erase(item_iter);
685  }
686  else
687  {
688  // find a peer that has it, we'll use the one who has the least requests going to it to load balance
689  bool item_fetched = false;
690  for (auto peer_iter = items_by_peer.get<requested_item_count_index>().begin(); peer_iter != items_by_peer.get<requested_item_count_index>().end(); ++peer_iter)
691  {
692  const peer_connection_ptr& peer = peer_iter->peer;
693  // if they have the item and we haven't already decided to ask them for too many other items
694  if (peer_iter->item_ids.size() < GRAPHENE_NET_MAX_ITEMS_PER_PEER_DURING_NORMAL_OPERATION &&
695  peer->inventory_peer_advertised_to_us.find(item_iter->item) != peer->inventory_peer_advertised_to_us.end())
696  {
697  if (item_iter->item.item_type == graphene::net::trx_message_type && peer->is_transaction_fetching_inhibited())
698  next_peer_unblocked_time = std::min(peer->transaction_fetching_inhibited_until, next_peer_unblocked_time);
699  else
700  {
701  //dlog("requesting item ${hash} from peer ${endpoint}",
702  // ("hash", iter->item.item_hash)("endpoint", peer->get_remote_endpoint()));
703  item_id item_id_to_fetch = item_iter->item;
704  peer->items_requested_from_peer.insert(peer_connection::item_to_time_map_type::value_type(item_id_to_fetch, fc::time_point::now()));
705  item_iter = _items_to_fetch.erase(item_iter);
706  item_fetched = true;
707  items_by_peer.get<requested_item_count_index>().modify(peer_iter, [&item_id_to_fetch](peer_and_items_to_fetch& peer_and_items) {
708  peer_and_items.item_ids.push_back(item_id_to_fetch);
709  });
710  break;
711  }
712  }
713  }
714  if (!item_fetched)
715  ++item_iter;
716  }
717  }
718 
719  // we've figured out which peer will be providing each item, now send the messages.
720  for (const peer_and_items_to_fetch& peer_and_items : items_by_peer)
721  {
722  // the item lists are heterogenous and
723  // the fetch_items_message can only deal with one item type at a time.
724  std::map<uint32_t, std::vector<item_hash_t> > items_to_fetch_by_type;
725  for (const item_id& item : peer_and_items.item_ids)
726  items_to_fetch_by_type[item.item_type].push_back(item.item_hash);
727  for (auto& items_by_type : items_to_fetch_by_type)
728  {
729  dlog("requesting ${count} items of type ${type} from peer ${endpoint}: ${hashes}",
730  ("count", items_by_type.second.size())("type", (uint32_t)items_by_type.first)
731  ("endpoint", peer_and_items.peer->get_remote_endpoint())
732  ("hashes", items_by_type.second));
733  peer_and_items.peer->send_message(fetch_items_message(items_by_type.first,
734  items_by_type.second));
735  }
736  }
737  items_by_peer.clear();
738 
740  {
741  _retrigger_fetch_item_loop_promise = fc::promise<void>::create("graphene::net::retrigger_fetch_item_loop");
742  fc::microseconds time_until_retrigger = fc::microseconds::maximum();
743  if (next_peer_unblocked_time != fc::time_point::maximum())
744  time_until_retrigger = next_peer_unblocked_time - fc::time_point::now();
745  try
746  {
747  if (time_until_retrigger > fc::microseconds(0))
748  _retrigger_fetch_item_loop_promise->wait(time_until_retrigger);
749  }
750  catch (const fc::timeout_exception&)
751  {
752  dlog("Resuming fetch_items_loop due to timeout -- one of our peers should no longer be throttled");
753  }
755  }
756  } // while (!canceled)
757  }
758 
760  {
765  }
766 
768  {
771  {
772  dlog("beginning an iteration of advertise inventory");
773  // swap inventory into local variable, clearing the node's copy
774  std::unordered_set<item_id> inventory_to_advertise;
775  inventory_to_advertise.swap(_new_inventory);
776 
777  // process all inventory to advertise and construct the inventory messages we'll send
778  // first, then send them all in a batch (to avoid any fiber interruption points while
779  // we're computing the messages)
780  std::list<std::pair<peer_connection_ptr, item_ids_inventory_message> > inventory_messages_to_send;
781 
782  for (const peer_connection_ptr& peer : _active_connections)
783  {
784  // only advertise to peers who are in sync with us
785  idump((peer->peer_needs_sync_items_from_us));
786  if( !peer->peer_needs_sync_items_from_us )
787  {
788  std::map<uint32_t, std::vector<item_hash_t> > items_to_advertise_by_type;
789  // don't send the peer anything we've already advertised to it
790  // or anything it has advertised to us
791  // group the items we need to send by type, because we'll need to send one inventory message per type
792  unsigned total_items_to_send_to_this_peer = 0;
793  idump((inventory_to_advertise));
794  for (const item_id& item_to_advertise : inventory_to_advertise)
795  {
796  auto adv_to_peer = peer->inventory_advertised_to_peer.find(item_to_advertise);
797  auto adv_to_us = peer->inventory_peer_advertised_to_us.find(item_to_advertise);
798 
799  if (adv_to_peer == peer->inventory_advertised_to_peer.end() &&
800  adv_to_us == peer->inventory_peer_advertised_to_us.end())
801  {
802  items_to_advertise_by_type[item_to_advertise.item_type].push_back(item_to_advertise.item_hash);
803  peer->inventory_advertised_to_peer.insert(peer_connection::timestamped_item_id(item_to_advertise, fc::time_point::now()));
804  ++total_items_to_send_to_this_peer;
805  if (item_to_advertise.item_type == trx_message_type)
806  testnetlog("advertising transaction ${id} to peer ${endpoint}", ("id", item_to_advertise.item_hash)("endpoint", peer->get_remote_endpoint()));
807  dlog("advertising item ${id} to peer ${endpoint}", ("id", item_to_advertise.item_hash)("endpoint", peer->get_remote_endpoint()));
808  }
809  else
810  {
811  if (adv_to_peer != peer->inventory_advertised_to_peer.end() )
812  idump( (*adv_to_peer) );
813  if (adv_to_us != peer->inventory_peer_advertised_to_us.end() )
814  idump( (*adv_to_us) );
815  }
816  }
817  dlog("advertising ${count} new item(s) of ${types} type(s) to peer ${endpoint}",
818  ("count", total_items_to_send_to_this_peer)
819  ("types", items_to_advertise_by_type.size())
820  ("endpoint", peer->get_remote_endpoint()));
821  for (auto items_group : items_to_advertise_by_type)
822  inventory_messages_to_send.push_back(std::make_pair(peer, item_ids_inventory_message(items_group.first, items_group.second)));
823  }
824  peer->clear_old_inventory();
825  }
826 
827  for (auto iter = inventory_messages_to_send.begin(); iter != inventory_messages_to_send.end(); ++iter)
828  iter->first->send_message(iter->second);
829  inventory_messages_to_send.clear();
830 
831  if (_new_inventory.empty())
832  {
833  _retrigger_advertise_inventory_loop_promise = fc::promise<void>::create("graphene::net::retrigger_advertise_inventory_loop");
836  }
837  } // while(!canceled)
838  }
839 
841  {
845  }
846 
848  {
850  std::list<peer_connection_ptr> peers_to_disconnect_gently;
851  std::list<peer_connection_ptr> peers_to_disconnect_forcibly;
852  std::list<peer_connection_ptr> peers_to_send_keep_alive;
853  std::list<peer_connection_ptr> peers_to_terminate;
854 
855  _recent_block_interval_in_seconds = _delegate->get_current_block_interval_in_seconds();
856 
857  // Disconnect peers that haven't sent us any data recently
858  // These numbers are just guesses and we need to think through how this works better.
859  // If we and our peers get disconnected from the rest of the network, we will not
860  // receive any blocks or transactions from the rest of the world, and that will
861  // probably make us disconnect from our peers even though we have working connections to
862  // them (but they won't have sent us anything since they aren't getting blocks either).
863  // This might not be so bad because it could make us initiate more connections and
864  // reconnect with the rest of the network, or it might just futher isolate us.
865  {
866  // As usual, the first step is to walk through all our peers and figure out which
867  // peers need action (disconneting, sending keepalives, etc), then we walk through
868  // those lists yielding at our leisure later.
870 
871  uint32_t handshaking_timeout = _peer_inactivity_timeout;
872  fc::time_point handshaking_disconnect_threshold = fc::time_point::now() - fc::seconds(handshaking_timeout);
873  for( const peer_connection_ptr handshaking_peer : _handshaking_connections )
874  if( handshaking_peer->connection_initiation_time < handshaking_disconnect_threshold &&
875  handshaking_peer->get_last_message_received_time() < handshaking_disconnect_threshold &&
876  handshaking_peer->get_last_message_sent_time() < handshaking_disconnect_threshold )
877  {
878  wlog( "Forcibly disconnecting from handshaking peer ${peer} due to inactivity of at least ${timeout} seconds",
879  ( "peer", handshaking_peer->get_remote_endpoint() )("timeout", handshaking_timeout ) );
880  wlog("Peer's negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}",
881  ("status", handshaking_peer->negotiation_status)
882  ("sent", handshaking_peer->get_total_bytes_sent())
883  ("received", handshaking_peer->get_total_bytes_received()));
884  handshaking_peer->connection_closed_error = fc::exception(FC_LOG_MESSAGE(warn, "Terminating handshaking connection due to inactivity of ${timeout} seconds. Negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}",
885  ("peer", handshaking_peer->get_remote_endpoint())
886  ("timeout", handshaking_timeout)
887  ("status", handshaking_peer->negotiation_status)
888  ("sent", handshaking_peer->get_total_bytes_sent())
889  ("received", handshaking_peer->get_total_bytes_received())));
890  peers_to_disconnect_forcibly.push_back( handshaking_peer );
891  }
892 
893  // timeout for any active peers is two block intervals
894  uint32_t active_disconnect_timeout = 10 * _recent_block_interval_in_seconds;
895  uint32_t active_send_keepalive_timeout = active_disconnect_timeout / 2;
896 
897  // set the ignored request time out to 6 second. When we request a block
898  // or transaction from a peer, this timeout determines how long we wait for them
899  // to reply before we give up and ask another peer for the item.
900  // Ideally this should be significantly shorter than the block interval, because
901  // we'd like to realize the block isn't coming and fetch it from a different
902  // peer before the next block comes in.
903  // Increased to 6 from 1 in #1660 due to heavy load. May need to adjust further
904  // Note: #1660 is https://github.com/steemit/steem/issues/1660
905  fc::microseconds active_ignored_request_timeout = fc::seconds(6);
906 
907  fc::time_point active_disconnect_threshold = fc::time_point::now() - fc::seconds(active_disconnect_timeout);
908  fc::time_point active_send_keepalive_threshold = fc::time_point::now() - fc::seconds(active_send_keepalive_timeout);
909  fc::time_point active_ignored_request_threshold = fc::time_point::now() - active_ignored_request_timeout;
910  for( const peer_connection_ptr& active_peer : _active_connections )
911  {
912  if( active_peer->connection_initiation_time < active_disconnect_threshold &&
913  active_peer->get_last_message_received_time() < active_disconnect_threshold )
914  {
915  wlog( "Closing connection with peer ${peer} due to inactivity of at least ${timeout} seconds",
916  ( "peer", active_peer->get_remote_endpoint() )("timeout", active_disconnect_timeout ) );
917  peers_to_disconnect_gently.push_back( active_peer );
918  }
919  else
920  {
921  bool disconnect_due_to_request_timeout = false;
922  if (!active_peer->sync_items_requested_from_peer.empty() &&
923  active_peer->last_sync_item_received_time < active_ignored_request_threshold)
924  {
925  wlog("Disconnecting peer ${peer} because they haven't made any progress on my remaining ${count} sync item requests",
926  ("peer", active_peer->get_remote_endpoint())("count", active_peer->sync_items_requested_from_peer.size()));
927  disconnect_due_to_request_timeout = true;
928  }
929  if (!disconnect_due_to_request_timeout &&
930  active_peer->item_ids_requested_from_peer &&
931  active_peer->item_ids_requested_from_peer->get<1>() < active_ignored_request_threshold)
932  {
933  wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ids after ${synopsis}",
934  ("peer", active_peer->get_remote_endpoint())
935  ("synopsis", active_peer->item_ids_requested_from_peer->get<0>()));
936  disconnect_due_to_request_timeout = true;
937  }
938  if (!disconnect_due_to_request_timeout)
939  for (const peer_connection::item_to_time_map_type::value_type& item_and_time : active_peer->items_requested_from_peer)
940  if (item_and_time.second < active_ignored_request_threshold)
941  {
942  wlog("Disconnecting peer ${peer} because they didn't respond to my request for item ${id}",
943  ("peer", active_peer->get_remote_endpoint())("id", item_and_time.first.item_hash));
944  disconnect_due_to_request_timeout = true;
945  break;
946  }
947  if (disconnect_due_to_request_timeout)
948  {
949  // we should probably disconnect nicely and give them a reason, but right now the logic
950  // for rescheduling the requests only executes when the connection is fully closed,
951  // and we want to get those requests rescheduled as soon as possible
952  peers_to_disconnect_forcibly.push_back(active_peer);
953  }
954  else if (active_peer->connection_initiation_time < active_send_keepalive_threshold &&
955  active_peer->get_last_message_received_time() < active_send_keepalive_threshold)
956  {
957  wlog( "Sending a keepalive message to peer ${peer} who hasn't sent us any messages in the last ${timeout} seconds",
958  ( "peer", active_peer->get_remote_endpoint() )("timeout", active_send_keepalive_timeout ) );
959  peers_to_send_keep_alive.push_back(active_peer);
960  }
961  else if (active_peer->we_need_sync_items_from_peer &&
962  !active_peer->is_currently_handling_message() &&
963  !active_peer->item_ids_requested_from_peer &&
964  active_peer->ids_of_items_to_get.empty())
965  {
966  // This is a state we should never get into in the first place, but if we do, we should disconnect the peer
967  // to re-establish the connection.
968  fc_wlog(fc::logger::get("sync"), "Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.",
969  ("peer", active_peer->get_remote_endpoint()));
970  wlog("Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.",
971  ("peer", active_peer->get_remote_endpoint()));
972  peers_to_disconnect_forcibly.push_back(active_peer);
973  }
974  }
975  }
976 
978  for( const peer_connection_ptr& closing_peer : _closing_connections )
979  if( closing_peer->connection_closed_time < closing_disconnect_threshold )
980  {
981  // we asked this peer to close their connectoin to us at least GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT
982  // seconds ago, but they haven't done it yet. Terminate the connection now
983  wlog( "Forcibly disconnecting peer ${peer} who failed to close their connection in a timely manner",
984  ( "peer", closing_peer->get_remote_endpoint() ) );
985  peers_to_disconnect_forcibly.push_back( closing_peer );
986  }
987 
988  uint32_t failed_terminate_timeout_seconds = 120;
989  fc::time_point failed_terminate_threshold = fc::time_point::now() - fc::seconds(failed_terminate_timeout_seconds);
990  for (const peer_connection_ptr& peer : _terminating_connections )
991  if (peer->get_connection_terminated_time() != fc::time_point::min() &&
992  peer->get_connection_terminated_time() < failed_terminate_threshold)
993  {
994  wlog("Terminating connection with peer ${peer}, closing the connection didn't work", ("peer", peer->get_remote_endpoint()));
995  peers_to_terminate.push_back(peer);
996  }
997 
998  // That's the end of the sorting step; now all peers that require further processing are now in one of the
999  // lists peers_to_disconnect_gently, peers_to_disconnect_forcibly, peers_to_send_keep_alive, or peers_to_terminate
1000 
1001  // if we've decided to delete any peers, do it now; in its current implementation this doesn't yield,
1002  // and once we start yielding, we may find that we've moved that peer to another list (closed or active)
1003  // and that triggers assertions, maybe even errors
1004  for (const peer_connection_ptr& peer : peers_to_terminate )
1005  {
1006  assert(_terminating_connections.find(peer) != _terminating_connections.end());
1007  _terminating_connections.erase(peer);
1009  }
1010  peers_to_terminate.clear();
1011 
1012  // if we're going to abruptly disconnect anyone, do it here
1013  // (it doesn't yield). I don't think there would be any harm if this were
1014  // moved to the yielding section
1015  for( const peer_connection_ptr& peer : peers_to_disconnect_forcibly )
1016  {
1018  peer->close_connection();
1019  }
1020  peers_to_disconnect_forcibly.clear();
1021  } // end ASSERT_TASK_NOT_PREEMPTED()
1022 
1023  // Now process the peers that we need to do yielding functions with (disconnect sends a message with the
1024  // disconnect reason, so it may yield)
1025  for( const peer_connection_ptr& peer : peers_to_disconnect_gently )
1026  {
1027  fc::exception detailed_error( FC_LOG_MESSAGE(warn, "Disconnecting due to inactivity",
1028  ( "last_message_received_seconds_ago", (peer->get_last_message_received_time() - fc::time_point::now() ).count() / fc::seconds(1 ).count() )
1029  ( "last_message_sent_seconds_ago", (peer->get_last_message_sent_time() - fc::time_point::now() ).count() / fc::seconds(1 ).count() )
1030  ( "inactivity_timeout", _active_connections.find(peer ) != _active_connections.end() ? _peer_inactivity_timeout * 10 : _peer_inactivity_timeout ) ) );
1031  disconnect_from_peer( peer.get(), "Disconnecting due to inactivity", false, detailed_error );
1032  }
1033  peers_to_disconnect_gently.clear();
1034 
1035  for( const peer_connection_ptr& peer : peers_to_send_keep_alive )
1036  peer->send_message(current_time_request_message(),
1037  offsetof(current_time_request_message, request_sent_time));
1038  peers_to_send_keep_alive.clear();
1039 
1043  "terminate_inactive_connections_loop" );
1044  }
1045 
1047  {
1049 
1050  std::list<peer_connection_ptr> original_active_peers(_active_connections.begin(), _active_connections.end());
1051  for( const peer_connection_ptr& active_peer : original_active_peers )
1052  {
1053  try
1054  {
1055  active_peer->send_message(address_request_message());
1056  }
1057  catch ( const fc::canceled_exception& )
1058  {
1059  throw;
1060  }
1061  catch (const fc::exception& e)
1062  {
1063  dlog("Caught exception while sending address request message to peer ${peer} : ${e}",
1064  ("peer", active_peer->get_remote_endpoint())("e", e));
1065  }
1066  }
1067 
1068  // this has nothing to do with updating the peer list, but we need to prune this list
1069  // at regular intervals, this is a fine place to do it.
1070  fc::time_point_sec oldest_failed_ids_to_keep(fc::time_point::now() - fc::minutes(15));
1071  auto oldest_failed_ids_to_keep_iter = _recently_failed_items.get<peer_connection::timestamp_index>().lower_bound(oldest_failed_ids_to_keep);
1072  auto begin_iter = _recently_failed_items.get<peer_connection::timestamp_index>().begin();
1073  _recently_failed_items.get<peer_connection::timestamp_index>().erase(begin_iter, oldest_failed_ids_to_keep_iter);
1074 
1078  "fetch_updated_peer_lists_loop" );
1079  }
1080  void node_impl::update_bandwidth_data(uint32_t bytes_read_this_second, uint32_t bytes_written_this_second)
1081  {
1083  _average_network_read_speed_seconds.push_back(bytes_read_this_second);
1084  _average_network_write_speed_seconds.push_back(bytes_written_this_second);
1087  {
1090  uint32_t average_read_this_minute = (uint32_t)boost::accumulate(_average_network_read_speed_seconds, uint64_t(0)) / (uint32_t)_average_network_read_speed_seconds.size();
1091  _average_network_read_speed_minutes.push_back(average_read_this_minute);
1092  uint32_t average_written_this_minute = (uint32_t)boost::accumulate(_average_network_write_speed_seconds, uint64_t(0)) / (uint32_t)_average_network_write_speed_seconds.size();
1093  _average_network_write_speed_minutes.push_back(average_written_this_minute);
1095  {
1097  uint32_t average_read_this_hour = (uint32_t)boost::accumulate(_average_network_read_speed_minutes, uint64_t(0)) / (uint32_t)_average_network_read_speed_minutes.size();
1098  _average_network_read_speed_hours.push_back(average_read_this_hour);
1099  uint32_t average_written_this_hour = (uint32_t)boost::accumulate(_average_network_write_speed_minutes, uint64_t(0)) / (uint32_t)_average_network_write_speed_minutes.size();
1100  _average_network_write_speed_hours.push_back(average_written_this_hour);
1101  }
1102  }
1103  }
1105  {
1107  fc::time_point_sec current_time = fc::time_point::now();
1108 
1110  _bandwidth_monitor_last_update_time = current_time;
1111 
1112  uint32_t seconds_since_last_update = current_time.sec_since_epoch() - _bandwidth_monitor_last_update_time.sec_since_epoch();
1113  seconds_since_last_update = std::max(UINT32_C(1), seconds_since_last_update);
1114  uint32_t bytes_read_this_second = _rate_limiter.get_actual_download_rate();
1115  uint32_t bytes_written_this_second = _rate_limiter.get_actual_upload_rate();
1116  for (uint32_t i = 0; i < seconds_since_last_update - 1; ++i)
1117  update_bandwidth_data(0, 0);
1118  update_bandwidth_data(bytes_read_this_second, bytes_written_this_second);
1119  _bandwidth_monitor_last_update_time = current_time;
1120 
1124  "bandwidth_monitor_loop" );
1125  }
1126 
1128  {
1130  dump_node_status();
1134  "dump_node_status_task");
1135  }
1136 
1138  {
1140 #ifdef USE_PEERS_TO_DELETE_MUTEX
1141  fc::scoped_lock<fc::mutex> lock(_peers_to_delete_mutex);
1142  dlog("in delayed_peer_deletion_task with ${count} in queue", ("count", _peers_to_delete.size()));
1143  _peers_to_delete.clear();
1144  dlog("_peers_to_delete cleared");
1145 #else
1146  while (!_peers_to_delete.empty())
1147  {
1148  std::list<peer_connection_ptr> peers_to_delete_copy;
1149  dlog("beginning an iteration of delayed_peer_deletion_task with ${count} in queue", ("count", _peers_to_delete.size()));
1150  peers_to_delete_copy.swap(_peers_to_delete);
1151  }
1152  dlog("leaving delayed_peer_deletion_task");
1153 #endif
1154  }
1155 
1157  {
1159 
1160  assert(_handshaking_connections.find(peer_to_delete) == _handshaking_connections.end());
1161  assert(_active_connections.find(peer_to_delete) == _active_connections.end());
1162  assert(_closing_connections.find(peer_to_delete) == _closing_connections.end());
1163  assert(_terminating_connections.find(peer_to_delete) == _terminating_connections.end());
1164 
1165 #ifdef USE_PEERS_TO_DELETE_MUTEX
1166  dlog("scheduling peer for deletion: ${peer} (may block on a mutex here)", ("peer", peer_to_delete->get_remote_endpoint()));
1167 
1168  unsigned number_of_peers_to_delete;
1169  {
1170  fc::scoped_lock<fc::mutex> lock(_peers_to_delete_mutex);
1171  _peers_to_delete.emplace_back(peer_to_delete);
1172  number_of_peers_to_delete = _peers_to_delete.size();
1173  }
1174  dlog("peer scheduled for deletion: ${peer}", ("peer", peer_to_delete->get_remote_endpoint()));
1175 
1176  if (!_node_is_shutting_down &&
1178  {
1179  dlog("asyncing delayed_peer_deletion_task to delete ${size} peers", ("size", number_of_peers_to_delete));
1180  _delayed_peer_deletion_task_done = fc::async([this](){ delayed_peer_deletion_task(); }, "delayed_peer_deletion_task" );
1181  }
1182  else
1183  dlog("delayed_peer_deletion_task is already scheduled (current size of _peers_to_delete is ${size})", ("size", number_of_peers_to_delete));
1184 #else
1185  dlog("scheduling peer for deletion: ${peer} (this will not block)", ("peer", peer_to_delete->get_remote_endpoint()));
1186  _peers_to_delete.push_back(peer_to_delete);
1187  if (!_node_is_shutting_down &&
1189  {
1190  dlog("asyncing delayed_peer_deletion_task to delete ${size} peers", ("size", _peers_to_delete.size()));
1191  _delayed_peer_deletion_task_done = fc::async([this](){ delayed_peer_deletion_task(); }, "delayed_peer_deletion_task" );
1192  }
1193  else
1194  dlog("delayed_peer_deletion_task is already scheduled (current size of _peers_to_delete is ${size})", ("size", _peers_to_delete.size()));
1195 
1196 #endif
1197  }
1198 
1200  {
1203  }
1204 
1206  {
1209  }
1210 
1212  {
1214  return (uint32_t)(_handshaking_connections.size() + _active_connections.size());
1215  }
1216 
1218  {
1219  for (const peer_connection_ptr& active_peer : _active_connections)
1220  if (node_id == active_peer->node_id)
1221  return active_peer;
1222  for (const peer_connection_ptr& handshaking_peer : _handshaking_connections)
1223  if (node_id == handshaking_peer->node_id)
1224  return handshaking_peer;
1225  return peer_connection_ptr();
1226  }
1227 
1229  {
1231  if (node_id == _node_id)
1232  {
1233  dlog("is_already_connected_to_id returning true because the peer is us");
1234  return true;
1235  }
1236  for (const peer_connection_ptr active_peer : _active_connections)
1237  if (node_id == active_peer->node_id)
1238  {
1239  dlog("is_already_connected_to_id returning true because the peer is already in our active list");
1240  return true;
1241  }
1242  for (const peer_connection_ptr handshaking_peer : _handshaking_connections)
1243  if (node_id == handshaking_peer->node_id)
1244  {
1245  dlog("is_already_connected_to_id returning true because the peer is already in our handshaking list");
1246  return true;
1247  }
1248  return false;
1249  }
1250 
1251  // merge addresses received from a peer into our database
1252  bool node_impl::merge_address_info_with_potential_peer_database(const std::vector<address_info> addresses)
1253  {
1255  bool new_information_received = false;
1256  for (const address_info& address : addresses)
1257  {
1258  if (address.firewalled == graphene::net::firewalled_state::not_firewalled)
1259  {
1260  potential_peer_record updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_endpoint(address.remote_endpoint);
1261  if (address.last_seen_time > updated_peer_record.last_seen_time)
1262  new_information_received = true;
1263  updated_peer_record.last_seen_time = std::max(address.last_seen_time, updated_peer_record.last_seen_time);
1264  _potential_peer_db.update_entry(updated_peer_record);
1265  }
1266  }
1267  return new_information_received;
1268  }
1269 
1271  {
1273  dlog("Currently have ${current} of [${desired}/${max}] connections",
1274  ("current", get_number_of_connections())
1275  ("desired", _desired_number_of_connections)
1277  dlog(" my id is ${id}", ("id", _node_id));
1278 
1279  for (const peer_connection_ptr& active_connection : _active_connections)
1280  {
1281  dlog(" active: ${endpoint} with ${id} [${direction}]",
1282  ("endpoint", active_connection->get_remote_endpoint())
1283  ("id", active_connection->node_id)
1284  ("direction", active_connection->direction));
1285  }
1286  for (const peer_connection_ptr& handshaking_connection : _handshaking_connections)
1287  {
1288  dlog(" handshaking: ${endpoint} with ${id} [${direction}]",
1289  ("endpoint", handshaking_connection->get_remote_endpoint())
1290  ("id", handshaking_connection->node_id)
1291  ("direction", handshaking_connection->direction));
1292  }
1293  }
1294 
1295  void node_impl::on_message( peer_connection* originating_peer, const message& received_message )
1296  {
1298  message_hash_type message_hash = received_message.id();
1299  dlog("handling message ${type} ${hash} size ${size} from peer ${endpoint}",
1300  ("type", graphene::net::core_message_type_enum(received_message.msg_type.value()))("hash", message_hash)
1301  ("size", received_message.size)
1302  ("endpoint", originating_peer->get_remote_endpoint()));
1303  switch ( received_message.msg_type.value() )
1304  {
1306  on_hello_message(originating_peer, received_message.as<hello_message>());
1307  break;
1309  on_connection_accepted_message(originating_peer, received_message.as<connection_accepted_message>());
1310  break;
1312  on_connection_rejected_message(originating_peer, received_message.as<connection_rejected_message>());
1313  break;
1315  on_address_request_message(originating_peer, received_message.as<address_request_message>());
1316  break;
1318  on_address_message(originating_peer, received_message.as<address_message>());
1319  break;
1321  on_fetch_blockchain_item_ids_message(originating_peer, received_message.as<fetch_blockchain_item_ids_message>());
1322  break;
1325  break;
1327  on_fetch_items_message(originating_peer, received_message.as<fetch_items_message>());
1328  break;
1330  on_item_not_available_message(originating_peer, received_message.as<item_not_available_message>());
1331  break;
1333  on_item_ids_inventory_message(originating_peer, received_message.as<item_ids_inventory_message>());
1334  break;
1336  on_closing_connection_message(originating_peer, received_message.as<closing_connection_message>());
1337  break;
1339  process_block_message(originating_peer, received_message, message_hash);
1340  break;
1342  on_current_time_request_message(originating_peer, received_message.as<current_time_request_message>());
1343  break;
1345  on_current_time_reply_message(originating_peer, received_message.as<current_time_reply_message>());
1346  break;
1348  on_check_firewall_message(originating_peer, received_message.as<check_firewall_message>());
1349  break;
1351  on_check_firewall_reply_message(originating_peer, received_message.as<check_firewall_reply_message>());
1352  break;
1355  break;
1358  break;
1359 
1360  default:
1361  // ignore any message in between core_message_type_first and _last that we don't handle above
1362  // to allow us to add messages in the future
1363  if (received_message.msg_type.value() < core_message_type_enum::core_message_type_first ||
1364  received_message.msg_type.value() > core_message_type_enum::core_message_type_last)
1365  process_ordinary_message(originating_peer, received_message, message_hash);
1366  break;
1367  }
1368  }
1369 
1370 
1372  {
1374  // for the time being, shoehorn a bunch of properties into the user_data variant object,
1375  // which lets us add and remove fields without changing the protocol. Once we
1376  // settle on what we really want in there, we'll likely promote them to first
1377  // class fields in the hello message
1378  fc::mutable_variant_object user_data;
1379  user_data["fc_git_revision_sha"] = fc::git_revision_sha;
1380  user_data["fc_git_revision_unix_timestamp"] = fc::git_revision_unix_timestamp;
1381 #if defined( __APPLE__ )
1382  user_data["platform"] = "osx";
1383 #elif defined( __OpenBSD__ )
1384  user_data["platform"] = "obsd";
1385 #elif defined( __linux__ )
1386  user_data["platform"] = "linux";
1387 #elif defined( _MSC_VER )
1388  user_data["platform"] = "win32";
1389 #else
1390  user_data["platform"] = "other";
1391 #endif
1392  user_data["bitness"] = sizeof(void*) * 8;
1393 
1394  user_data["node_id"] = fc::variant( _node_id, 1 );
1395 
1396  item_hash_t head_block_id = _delegate->get_head_block_id();
1397  user_data["last_known_block_hash"] = fc::variant( head_block_id, 1 );
1398  user_data["last_known_block_number"] = _delegate->get_block_number(head_block_id);
1399  user_data["last_known_block_time"] = _delegate->get_block_time(head_block_id);
1400 
1401  if (!_hard_fork_block_numbers.empty())
1402  user_data["last_known_fork_block_number"] = _hard_fork_block_numbers.back();
1403 
1404  return user_data;
1405  }
1407  {
1409  // try to parse data out of the user_agent string
1410  if (user_data.contains("graphene_git_revision_sha"))
1411  originating_peer->graphene_git_revision_sha = user_data["graphene_git_revision_sha"].as_string();
1412  if (user_data.contains("graphene_git_revision_unix_timestamp"))
1413  originating_peer->graphene_git_revision_unix_timestamp = fc::time_point_sec(user_data["graphene_git_revision_unix_timestamp"].as<uint32_t>(1));
1414  if (user_data.contains("fc_git_revision_sha"))
1415  originating_peer->fc_git_revision_sha = user_data["fc_git_revision_sha"].as_string();
1416  if (user_data.contains("fc_git_revision_unix_timestamp"))
1417  originating_peer->fc_git_revision_unix_timestamp = fc::time_point_sec(user_data["fc_git_revision_unix_timestamp"].as<uint32_t>(1));
1418  if (user_data.contains("platform"))
1419  originating_peer->platform = user_data["platform"].as_string();
1420  if (user_data.contains("bitness"))
1421  originating_peer->bitness = user_data["bitness"].as<uint32_t>(1);
1422  if (user_data.contains("node_id"))
1423  originating_peer->node_id = user_data["node_id"].as<node_id_t>(1);
1424  if (user_data.contains("last_known_fork_block_number"))
1425  originating_peer->last_known_fork_block_number = user_data["last_known_fork_block_number"].as<uint32_t>(1);
1426  }
1427 
1428  void node_impl::on_hello_message( peer_connection* originating_peer, const hello_message& hello_message_received )
1429  {
1431  // this already_connected check must come before we fill in peer data below
1432  node_id_t peer_node_id = hello_message_received.node_public_key;
1433  try
1434  {
1435  peer_node_id = hello_message_received.user_data["node_id"].as<node_id_t>(1);
1436  }
1437  catch (const fc::exception&)
1438  {
1439  // either it's not there or it's not a valid session id. either way, ignore.
1440  }
1441  bool already_connected_to_this_peer = is_already_connected_to_id(peer_node_id);
1442 
1443  // validate the node id
1444  fc::sha256::encoder shared_secret_encoder;
1445  fc::sha512 shared_secret = originating_peer->get_shared_secret();
1446  shared_secret_encoder.write(shared_secret.data(), sizeof(shared_secret));
1447  fc::ecc::public_key expected_node_public_key(hello_message_received.signed_shared_secret, shared_secret_encoder.result(), false);
1448 
1449  // store off the data provided in the hello message
1450  originating_peer->user_agent = hello_message_received.user_agent;
1451  originating_peer->node_public_key = hello_message_received.node_public_key;
1452  originating_peer->node_id = hello_message_received.node_public_key; // will probably be overwritten in parse_hello_user_data_for_peer()
1453  originating_peer->core_protocol_version = hello_message_received.core_protocol_version;
1454  originating_peer->inbound_address = hello_message_received.inbound_address;
1455  originating_peer->inbound_port = hello_message_received.inbound_port;
1456  originating_peer->outbound_port = hello_message_received.outbound_port;
1457 
1458  parse_hello_user_data_for_peer(originating_peer, hello_message_received.user_data);
1459 
1460  // if they didn't provide a last known fork, try to guess it
1461  if (originating_peer->last_known_fork_block_number == 0 &&
1462  originating_peer->graphene_git_revision_unix_timestamp)
1463  {
1464  uint32_t unix_timestamp = originating_peer->graphene_git_revision_unix_timestamp->sec_since_epoch();
1465  originating_peer->last_known_fork_block_number = _delegate->estimate_last_known_fork_from_git_revision_timestamp(unix_timestamp);
1466  }
1467 
1468  // now decide what to do with it
1470  {
1471  if (hello_message_received.node_public_key != expected_node_public_key.serialize())
1472  {
1473  wlog("Invalid signature in hello message from peer ${peer}", ("peer", originating_peer->get_remote_endpoint()));
1474  std::string rejection_message("Invalid signature in hello message");
1476  originating_peer->get_socket().remote_endpoint(),
1478  rejection_message);
1479 
1481  originating_peer->send_message( message(connection_rejected ) );
1482  // for this type of message, we're immediately disconnecting this peer
1483  disconnect_from_peer( originating_peer, "Invalid signature in hello message" );
1484  return;
1485  }
1486  if (hello_message_received.chain_id != _chain_id)
1487  {
1488  wlog("Received hello message from peer on a different chain: ${message}", ("message", hello_message_received));
1489  std::ostringstream rejection_message;
1490  rejection_message << "You're on a different chain than I am. I'm on " << _chain_id.str() <<
1491  " and you're on " << hello_message_received.chain_id.str();
1493  originating_peer->get_socket().remote_endpoint(),
1495  rejection_message.str());
1496 
1498  originating_peer->send_message(message(connection_rejected));
1499  // for this type of message, we're immediately disconnecting this peer, instead of trying to
1500  // allowing her to ask us for peers (any of our peers will be on the same chain as us, so there's no
1501  // benefit of sharing them)
1502  disconnect_from_peer(originating_peer, "You are on a different chain from me");
1503  return;
1504  }
1505  if (originating_peer->last_known_fork_block_number != 0)
1506  {
1507  uint32_t next_fork_block_number = get_next_known_hard_fork_block_number(originating_peer->last_known_fork_block_number);
1508  if (next_fork_block_number != 0)
1509  {
1510  // we know about a fork they don't. See if we've already passed that block. If we have, don't let them
1511  // connect because we won't be able to give them anything useful
1512  uint32_t head_block_num = _delegate->get_block_number(_delegate->get_head_block_id());
1513  if (next_fork_block_number < head_block_num)
1514  {
1515 #ifdef ENABLE_DEBUG_ULOGS
1516  ulog("Rejecting connection from peer because their version is too old. Their version date: ${date}", ("date", originating_peer->graphene_git_revision_unix_timestamp));
1517 #endif
1518  wlog("Received hello message from peer running a version of that can only understand blocks up to #${their_hard_fork}, but I'm at head block number #${my_block_number}",
1519  ("their_hard_fork", next_fork_block_number)("my_block_number", head_block_num));
1520  std::ostringstream rejection_message;
1521  rejection_message << "Your client is outdated -- you can only understand blocks up to #" << next_fork_block_number << ", but I'm already on block #" << head_block_num;
1523  originating_peer->get_socket().remote_endpoint(),
1525  rejection_message.str() );
1526 
1528  originating_peer->send_message(message(connection_rejected));
1529  // for this type of message, we're immediately disconnecting this peer, instead of trying to
1530  // allowing her to ask us for peers (any of our peers will be on the same chain as us, so there's no
1531  // benefit of sharing them)
1532  disconnect_from_peer(originating_peer, "Your client is too old, please upgrade");
1533  return;
1534  }
1535  }
1536  }
1537  if (already_connected_to_this_peer)
1538  {
1539 
1540  connection_rejected_message connection_rejected;
1541  if (_node_id == originating_peer->node_id)
1543  originating_peer->get_socket().remote_endpoint(),
1545  "I'm connecting to myself");
1546  else
1548  originating_peer->get_socket().remote_endpoint(),
1550  "I'm already connected to you");
1552  originating_peer->send_message(message(connection_rejected));
1553  dlog("Received a hello_message from peer ${peer} that I'm already connected to (with id ${id}), rejection",
1554  ("peer", originating_peer->get_remote_endpoint())
1555  ("id", originating_peer->node_id));
1556  }
1557 #ifdef ENABLE_P2P_DEBUGGING_API
1558  else if(!_allowed_peers.empty() &&
1559  _allowed_peers.find(originating_peer->node_id) == _allowed_peers.end())
1560  {
1562  originating_peer->get_socket().remote_endpoint(),
1564  "you are not in my allowed_peers list");
1566  originating_peer->send_message( message(connection_rejected ) );
1567  dlog( "Received a hello_message from peer ${peer} who isn't in my allowed_peers list, rejection", ("peer", originating_peer->get_remote_endpoint() ) );
1568  }
1569 #endif // ENABLE_P2P_DEBUGGING_API
1570  else
1571  {
1572  // whether we're planning on accepting them as a peer or not, they seem to be a valid node,
1573  // so add them to our database if they're not firewalled
1574 
1575  // in the hello message, the peer sent us the IP address and port it thought it was connecting from.
1576  // If they match the IP and port we see, we assume that they're actually on the internet and they're not
1577  // firewalled.
1578  fc::ip::endpoint peers_actual_outbound_endpoint = originating_peer->get_socket().remote_endpoint();
1579  if( peers_actual_outbound_endpoint.get_address() == originating_peer->inbound_address &&
1580  peers_actual_outbound_endpoint.port() == originating_peer->outbound_port )
1581  {
1582  if( originating_peer->inbound_port == 0 )
1583  {
1584  dlog( "peer does not appear to be firewalled, but they did not give an inbound port so I'm treating them as if they are." );
1585  originating_peer->is_firewalled = firewalled_state::firewalled;
1586  }
1587  else
1588  {
1589  // peer is not firewalled, add it to our database
1590  fc::ip::endpoint peers_inbound_endpoint(originating_peer->inbound_address, originating_peer->inbound_port);
1591  potential_peer_record updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_endpoint(peers_inbound_endpoint);
1592  _potential_peer_db.update_entry(updated_peer_record);
1593  originating_peer->is_firewalled = firewalled_state::not_firewalled;
1594  }
1595  }
1596  else
1597  {
1598  dlog("peer is firewalled: they think their outbound endpoint is ${reported_endpoint}, but I see it as ${actual_endpoint}",
1599  ("reported_endpoint", fc::ip::endpoint(originating_peer->inbound_address, originating_peer->outbound_port))
1600  ("actual_endpoint", peers_actual_outbound_endpoint));
1601  originating_peer->is_firewalled = firewalled_state::firewalled;
1602  }
1603 
1605  {
1607  originating_peer->get_socket().remote_endpoint(),
1609  "not accepting any more incoming connections");
1611  originating_peer->send_message(message(connection_rejected));
1612  dlog("Received a hello_message from peer ${peer}, but I'm not accepting any more connections, rejection",
1613  ("peer", originating_peer->get_remote_endpoint()));
1614  }
1615  else
1616  {
1618  originating_peer->send_message(message(connection_accepted_message()));
1619  dlog("Received a hello_message from peer ${peer}, sending reply to accept connection",
1620  ("peer", originating_peer->get_remote_endpoint()));
1621  }
1622  }
1623  }
1624  else
1625  {
1626  // we can wind up here if we've connected to ourself, and the source and
1627  // destination endpoints are the same, causing messages we send out
1628  // to arrive back on the initiating socket instead of the receiving
1629  // socket. If we did a complete job of enumerating local addresses,
1630  // we could avoid directly connecting to ourselves, or at least detect
1631  // immediately when we did it and disconnect.
1632 
1633  // The only way I know of that we'd get an unexpected hello that we
1634  // can't really guard against is if we do a simulatenous open, we
1635  // probably need to think through that case. We're not attempting that
1636  // yet, though, so it's ok to just disconnect here.
1637  wlog("unexpected hello_message from peer, disconnecting");
1638  disconnect_from_peer(originating_peer, "Received a unexpected hello_message");
1639  }
1640  }
1641 
1642  void node_impl::on_connection_accepted_message(peer_connection* originating_peer, const connection_accepted_message& connection_accepted_message_received)
1643  {
1645  dlog("Received a connection_accepted in response to my \"hello\" from ${peer}", ("peer", originating_peer->get_remote_endpoint()));
1648  originating_peer->send_message(address_request_message());
1652  originating_peer->core_protocol_version >= 106)
1653  {
1654  wlog("I don't know if I'm firewalled. Sending a firewall check message to peer ${peer}",
1655  ("peer", originating_peer->get_remote_endpoint()));
1656  originating_peer->firewall_check_state = new firewall_check_state_data;
1657 
1658  originating_peer->send_message(check_firewall_message());
1660  }
1661  }
1662 
1663  void node_impl::on_connection_rejected_message(peer_connection* originating_peer, const connection_rejected_message& connection_rejected_message_received)
1664  {
1667  {
1668  ilog("Received a rejection from ${peer} in response to my \"hello\", reason: \"${reason}\"",
1669  ("peer", originating_peer->get_remote_endpoint())
1670  ("reason", connection_rejected_message_received.reason_string));
1671 
1672  if (connection_rejected_message_received.reason_code == rejection_reason_code::connected_to_self)
1673  {
1674  _potential_peer_db.erase(originating_peer->get_socket().remote_endpoint());
1675  move_peer_to_closing_list(originating_peer->shared_from_this());
1676  originating_peer->close_connection();
1677  }
1678  else
1679  {
1680  // update our database to record that we were rejected so we won't try to connect again for a while
1681  // this only happens on connections we originate, so we should already know that peer is not firewalled
1683  if (updated_peer_record)
1684  {
1685  updated_peer_record->last_connection_disposition = last_connection_rejected;
1686  updated_peer_record->last_connection_attempt_time = fc::time_point::now();
1687  _potential_peer_db.update_entry(*updated_peer_record);
1688  }
1689  }
1690 
1693  originating_peer->send_message(address_request_message());
1694  }
1695  else
1696  FC_THROW( "unexpected connection_rejected_message from peer" );
1697  }
1698 
1699  void node_impl::on_address_request_message(peer_connection* originating_peer, const address_request_message& address_request_message_received)
1700  {
1702  dlog("Received an address request message");
1703 
1704  address_message reply;
1706  {
1707  reply.addresses.reserve(_active_connections.size());
1708  for (const peer_connection_ptr& active_peer : _active_connections)
1709  {
1710  fc::optional<potential_peer_record> updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*active_peer->get_remote_endpoint());
1711  if (updated_peer_record)
1712  {
1713  updated_peer_record->last_seen_time = fc::time_point::now();
1714  _potential_peer_db.update_entry(*updated_peer_record);
1715  }
1716 
1717  reply.addresses.emplace_back(address_info(*active_peer->get_remote_endpoint(),
1719  active_peer->round_trip_delay,
1720  active_peer->node_id,
1721  active_peer->direction,
1722  active_peer->is_firewalled));
1723  }
1724  }
1725  originating_peer->send_message(reply);
1726  }
1727 
1728  void node_impl::on_address_message(peer_connection* originating_peer, const address_message& address_message_received)
1729  {
1731  dlog("Received an address message containing ${size} addresses", ("size", address_message_received.addresses.size()));
1732  for (const address_info& address : address_message_received.addresses)
1733  {
1734  dlog(" ${endpoint} last seen ${time}", ("endpoint", address.remote_endpoint)("time", address.last_seen_time));
1735  }
1736  std::vector<graphene::net::address_info> updated_addresses = address_message_received.addresses;
1737  for (address_info& address : updated_addresses)
1739  bool new_information_received = merge_address_info_with_potential_peer_database(updated_addresses);
1740  if (new_information_received)
1742 
1743  if (_handshaking_connections.find(originating_peer->shared_from_this()) != _handshaking_connections.end())
1744  {
1745  // if we were handshaking, we need to continue with the next step in handshaking (which is either
1746  // ending handshaking and starting synchronization or disconnecting)
1748  disconnect_from_peer(originating_peer, "You rejected my connection request (hello message) so I'm disconnecting");
1750  disconnect_from_peer(originating_peer, "I rejected your connection request (hello message) so I'm disconnecting");
1751  else
1752  {
1753  fc::optional<fc::ip::endpoint> inbound_endpoint = originating_peer->get_endpoint_for_connecting();
1754  if (inbound_endpoint)
1755  {
1756  // mark the connection as successful in the database
1757  fc::optional<potential_peer_record> updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*inbound_endpoint);
1758  if (updated_peer_record)
1759  {
1760  updated_peer_record->last_connection_disposition = last_connection_succeeded;
1761  _potential_peer_db.update_entry(*updated_peer_record);
1762  }
1763  }
1764 
1766  move_peer_to_active_list(originating_peer->shared_from_this());
1767  new_peer_just_added(originating_peer->shared_from_this());
1768  }
1769  }
1770  // else if this was an active connection, then this was just a reply to our periodic address requests.
1771  // we've processed it, there's nothing else to do
1772  }
1773 
1775  const fetch_blockchain_item_ids_message& fetch_blockchain_item_ids_message_received)
1776  {
1778  item_id peers_last_item_seen = item_id(fetch_blockchain_item_ids_message_received.item_type, item_hash_t());
1779  if (fetch_blockchain_item_ids_message_received.blockchain_synopsis.empty())
1780  {
1781  dlog("sync: received a request for item ids starting at the beginning of the chain from peer ${peer_endpoint} (full request: ${synopsis})",
1782  ("peer_endpoint", originating_peer->get_remote_endpoint())
1783  ("synopsis", fetch_blockchain_item_ids_message_received.blockchain_synopsis));
1784  }
1785  else
1786  {
1787  item_hash_t peers_last_item_hash_seen = fetch_blockchain_item_ids_message_received.blockchain_synopsis.back();
1788  dlog("sync: received a request for item ids after ${last_item_seen} from peer ${peer_endpoint} (full request: ${synopsis})",
1789  ("last_item_seen", peers_last_item_hash_seen)
1790  ("peer_endpoint", originating_peer->get_remote_endpoint())
1791  ("synopsis", fetch_blockchain_item_ids_message_received.blockchain_synopsis));
1792  peers_last_item_seen.item_hash = peers_last_item_hash_seen;
1793  }
1794 
1796  reply_message.item_type = fetch_blockchain_item_ids_message_received.item_type;
1797  reply_message.total_remaining_item_count = 0;
1798  try
1799  {
1800  reply_message.item_hashes_available = _delegate->get_block_ids(fetch_blockchain_item_ids_message_received.blockchain_synopsis,
1801  reply_message.total_remaining_item_count);
1802  }
1803  catch (const peer_is_on_an_unreachable_fork&)
1804  {
1805  dlog("Peer is on a fork and there's no set of blocks we can provide to switch them to our fork");
1806  // we reply with an empty list as if we had an empty blockchain;
1807  // we don't want to disconnect because they may be able to provide
1808  // us with blocks on their chain
1809  }
1810 
1811  bool disconnect_from_inhibited_peer = false;
1812  // if our client doesn't have any items after the item the peer requested, it will send back
1813  // a list containing the last item the peer requested
1814  idump((reply_message)(fetch_blockchain_item_ids_message_received.blockchain_synopsis));
1815  if( reply_message.item_hashes_available.empty() )
1816  originating_peer->peer_needs_sync_items_from_us = false; /* I have no items in my blockchain */
1817  else if( !fetch_blockchain_item_ids_message_received.blockchain_synopsis.empty() &&
1818  reply_message.item_hashes_available.size() == 1 &&
1819  std::find(fetch_blockchain_item_ids_message_received.blockchain_synopsis.begin(),
1820  fetch_blockchain_item_ids_message_received.blockchain_synopsis.end(),
1821  reply_message.item_hashes_available.back() ) != fetch_blockchain_item_ids_message_received.blockchain_synopsis.end() )
1822  {
1823  /* the last item in the peer's list matches the last item in our list */
1824  originating_peer->peer_needs_sync_items_from_us = false;
1825  if (originating_peer->inhibit_fetching_sync_blocks)
1826  disconnect_from_inhibited_peer = true; // delay disconnecting until after we send our reply to this fetch_blockchain_item_ids_message
1827  }
1828  else
1829  originating_peer->peer_needs_sync_items_from_us = true;
1830 
1831  if (!originating_peer->peer_needs_sync_items_from_us)
1832  {
1833  dlog("sync: peer is already in sync with us");
1834  // if we thought we had all the items this peer had, but now it turns out that we don't
1835  // have the last item it requested to send from,
1836  // we need to kick off another round of synchronization
1837  if (!originating_peer->we_need_sync_items_from_peer &&
1838  !fetch_blockchain_item_ids_message_received.blockchain_synopsis.empty() &&
1839  !_delegate->has_item(peers_last_item_seen))
1840  {
1841  dlog("sync: restarting sync with peer ${peer}", ("peer", originating_peer->get_remote_endpoint()));
1842  start_synchronizing_with_peer(originating_peer->shared_from_this());
1843  }
1844  }
1845  else
1846  {
1847  dlog("sync: peer is out of sync, sending peer ${count} items ids: first: ${first_item_id}, last: ${last_item_id}",
1848  ("count", reply_message.item_hashes_available.size())
1849  ("first_item_id", reply_message.item_hashes_available.front())
1850  ("last_item_id", reply_message.item_hashes_available.back()));
1851  if (!originating_peer->we_need_sync_items_from_peer &&
1852  !fetch_blockchain_item_ids_message_received.blockchain_synopsis.empty() &&
1853  !_delegate->has_item(peers_last_item_seen))
1854  {
1855  dlog("sync: restarting sync with peer ${peer}", ("peer", originating_peer->get_remote_endpoint()));
1856  start_synchronizing_with_peer(originating_peer->shared_from_this());
1857  }
1858  }
1859  originating_peer->send_message(reply_message);
1860 
1861  if (disconnect_from_inhibited_peer)
1862  {
1863  // the peer has all of our blocks, and we don't want any of theirs, so disconnect them
1864  disconnect_from_peer(originating_peer, "you are on a fork that I'm unable to switch to");
1865  return;
1866  }
1867 
1868  if (originating_peer->direction == peer_connection_direction::inbound &&
1869  _handshaking_connections.find(originating_peer->shared_from_this()) != _handshaking_connections.end())
1870  {
1871  // handshaking is done, move the connection to fully active status and start synchronizing
1872  dlog("peer ${endpoint} which was handshaking with us has started synchronizing with us, start syncing with it",
1873  ("endpoint", originating_peer->get_remote_endpoint()));
1874  fc::optional<fc::ip::endpoint> inbound_endpoint = originating_peer->get_endpoint_for_connecting();
1875  if (inbound_endpoint)
1876  {
1877  // mark the connection as successful in the database
1878  potential_peer_record updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_endpoint(*inbound_endpoint);
1880  _potential_peer_db.update_entry(updated_peer_record);
1881  }
1882 
1883  // transition it to our active list
1884  move_peer_to_active_list(originating_peer->shared_from_this());
1885  new_peer_just_added(originating_peer->shared_from_this());
1886  }
1887  }
1888 
1890  {
1892  uint32_t max_number_of_unfetched_items = 0;
1893  for( const peer_connection_ptr& peer : _active_connections )
1894  {
1895  uint32_t this_peer_number_of_unfetched_items = (uint32_t)peer->ids_of_items_to_get.size() + peer->number_of_unfetched_item_ids;
1896  max_number_of_unfetched_items = std::max(max_number_of_unfetched_items,
1897  this_peer_number_of_unfetched_items);
1898  }
1899  return max_number_of_unfetched_items;
1900  }
1901 
1902  // get a blockchain synopsis that makes sense to send to the given peer.
1903  // If the peer isn't yet syncing with us, this is just a synopsis of our active blockchain
1904  // If the peer is syncing with us, it is a synopsis of our active blockchain plus the
1905  // blocks the peer has already told us it has
1906  std::vector<item_hash_t> node_impl::create_blockchain_synopsis_for_peer( const peer_connection* peer )
1907  {
1909  item_hash_t reference_point = peer->last_block_delegate_has_seen;
1910 
1911  // when we call _delegate->get_blockchain_synopsis(), we may yield and there's a
1912  // chance this peer's state will change before we get control back. Save off
1913  // the stuff necessary for generating the synopsis.
1914  // This is pretty expensive, we should find a better way to do this
1915  std::vector<item_hash_t> original_ids_of_items_to_get(peer->ids_of_items_to_get.begin(), peer->ids_of_items_to_get.end());
1916  uint32_t number_of_blocks_after_reference_point = original_ids_of_items_to_get.size();
1917 
1918  std::vector<item_hash_t> synopsis = _delegate->get_blockchain_synopsis(reference_point, number_of_blocks_after_reference_point);
1919 
1920 #if 0
1921  // just for debugging, enable this and set a breakpoint to step through
1922  if (synopsis.empty())
1923  synopsis = _delegate->get_blockchain_synopsis(reference_point, number_of_blocks_after_reference_point);
1924 
1925  // TODO: it's possible that the returned synopsis is empty if the blockchain is empty (that's fine)
1926  // or if the reference point is now past our undo history (that's not).
1927  // in the second case, we should mark this peer as one we're unable to sync with and
1928  // disconnect them.
1929  if (reference_point != item_hash_t() && synopsis.empty())
1930  FC_THROW_EXCEPTION(block_older_than_undo_history, "You are on a fork I'm unable to switch to");
1931 #endif
1932 
1933  if( number_of_blocks_after_reference_point )
1934  {
1935  // then the synopsis is incomplete, add the missing elements from ids_of_items_to_get
1936  uint32_t first_block_num_in_ids_to_get = _delegate->get_block_number(original_ids_of_items_to_get.front());
1937  uint32_t true_high_block_num = first_block_num_in_ids_to_get + original_ids_of_items_to_get.size() - 1;
1938 
1939  // in order to generate a seamless synopsis, we need to be using the same low_block_num as the
1940  // backend code; the first block in the synopsis will be the low block number it used
1941  uint32_t low_block_num = synopsis.empty() ? 1 : _delegate->get_block_number(synopsis.front());
1942 
1943  do
1944  {
1945  if( low_block_num >= first_block_num_in_ids_to_get )
1946  synopsis.push_back(original_ids_of_items_to_get[low_block_num - first_block_num_in_ids_to_get]);
1947  low_block_num += (true_high_block_num - low_block_num + 2 ) / 2;
1948  }
1949  while ( low_block_num <= true_high_block_num );
1950  assert(synopsis.back() == original_ids_of_items_to_get.back());
1951  }
1952  return synopsis;
1953  }
1954 
1955  void node_impl::fetch_next_batch_of_item_ids_from_peer( peer_connection* peer, bool reset_fork_tracking_data_for_peer /* = false */ )
1956  {
1958  if( reset_fork_tracking_data_for_peer )
1959  {
1961  peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hash_t());
1962  }
1963 
1964  fc::oexception synopsis_exception;
1965  try
1966  {
1967  std::vector<item_hash_t> blockchain_synopsis = create_blockchain_synopsis_for_peer( peer );
1968 
1969  item_hash_t last_item_seen = blockchain_synopsis.empty() ? item_hash_t() : blockchain_synopsis.back();
1970  dlog( "sync: sending a request for the next items after ${last_item_seen} to peer ${peer}, (full request is ${blockchain_synopsis})",
1971  ( "last_item_seen", last_item_seen )
1972  ( "peer", peer->get_remote_endpoint() )
1973  ( "blockchain_synopsis", blockchain_synopsis ) );
1974  peer->item_ids_requested_from_peer = boost::make_tuple( blockchain_synopsis, fc::time_point::now() );
1975  peer->send_message( fetch_blockchain_item_ids_message(_sync_item_type, blockchain_synopsis ) );
1976  }
1977  catch (const block_older_than_undo_history& e)
1978  {
1979  synopsis_exception = e;
1980  }
1981  if (synopsis_exception)
1982  disconnect_from_peer(peer, "You are on a fork I'm unable to switch to");
1983  }
1984 
1986  const blockchain_item_ids_inventory_message& blockchain_item_ids_inventory_message_received )
1987  {
1989  // ignore unless we asked for the data
1990  if( originating_peer->item_ids_requested_from_peer )
1991  {
1992  // verify that the peer's the block ids the peer sent is a valid response to our request;
1993  // It should either be an empty list of blocks, or a list of blocks that builds off of one of
1994  // the blocks in the synopsis we sent
1995  if (!blockchain_item_ids_inventory_message_received.item_hashes_available.empty())
1996  {
1997  // what's more, it should be a sequential list of blocks, verify that first
1998  uint32_t first_block_number_in_reponse = _delegate->get_block_number(blockchain_item_ids_inventory_message_received.item_hashes_available.front());
1999  for (unsigned i = 1; i < blockchain_item_ids_inventory_message_received.item_hashes_available.size(); ++i)
2000  {
2001  uint32_t actual_num = _delegate->get_block_number(blockchain_item_ids_inventory_message_received.item_hashes_available[i]);
2002  uint32_t expected_num = first_block_number_in_reponse + i;
2003  if (actual_num != expected_num)
2004  {
2005  wlog("Invalid response from peer ${peer_endpoint}. The list of blocks they provided is not sequential, "
2006  "the ${position}th block in their reply was block number ${actual_num}, "
2007  "but it should have been number ${expected_num}",
2008  ("peer_endpoint", originating_peer->get_remote_endpoint())
2009  ("position", i)
2010  ("actual_num", actual_num)
2011  ("expected_num", expected_num));
2012  fc::exception error_for_peer(FC_LOG_MESSAGE(error,
2013  "You gave an invalid response to my request for sync blocks. The list of blocks you provided is not sequential, "
2014  "the ${position}th block in their reply was block number ${actual_num}, "
2015  "but it should have been number ${expected_num}",
2016  ("position", i)
2017  ("actual_num", actual_num)
2018  ("expected_num", expected_num)));
2019  disconnect_from_peer(originating_peer,
2020  "You gave an invalid response to my request for sync blocks",
2021  true, error_for_peer);
2022  return;
2023  }
2024  }
2025 
2026  const std::vector<item_hash_t>& synopsis_sent_in_request = originating_peer->item_ids_requested_from_peer->get<0>();
2027  const item_hash_t& first_item_hash = blockchain_item_ids_inventory_message_received.item_hashes_available.front();
2028 
2029  if (synopsis_sent_in_request.empty())
2030  {
2031  // if we sent an empty synopsis, we were asking for all blocks, so the first block should be block 1
2032  if (_delegate->get_block_number(first_item_hash) != 1)
2033  {
2034  wlog("Invalid response from peer ${peer_endpoint}. We requested a list of sync blocks starting from the beginning of the chain, "
2035  "but they provided a list of blocks starting with ${first_block}",
2036  ("peer_endpoint", originating_peer->get_remote_endpoint())
2037  ("first_block", first_item_hash));
2038  fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You gave an invalid response for my request for sync blocks. I asked for blocks starting from the beginning of the chain, "
2039  "but you returned a list of blocks starting with ${first_block}",
2040  ("first_block", first_item_hash)));
2041  disconnect_from_peer(originating_peer,
2042  "You gave an invalid response to my request for sync blocks",
2043  true, error_for_peer);
2044  return;
2045  }
2046  }
2047  else // synopsis was not empty, we expect a response building off one of the blocks we sent
2048  {
2049  if (boost::range::find(synopsis_sent_in_request, first_item_hash) == synopsis_sent_in_request.end())
2050  {
2051  wlog("Invalid response from peer ${peer_endpoint}. We requested a list of sync blocks based on the synopsis ${synopsis}, but they "
2052  "provided a list of blocks starting with ${first_block}",
2053  ("peer_endpoint", originating_peer->get_remote_endpoint())
2054  ("synopsis", synopsis_sent_in_request)
2055  ("first_block", first_item_hash));
2056  fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You gave an invalid response for my request for sync blocks. I asked for blocks following something in "
2057  "${synopsis}, but you returned a list of blocks starting with ${first_block} which wasn't one of your choices",
2058  ("synopsis", synopsis_sent_in_request)
2059  ("first_block", first_item_hash)));
2060  disconnect_from_peer(originating_peer,
2061  "You gave an invalid response to my request for sync blocks",
2062  true, error_for_peer);
2063  return;
2064  }
2065  }
2066  }
2067  originating_peer->item_ids_requested_from_peer.reset();
2068 
2069  // if exceptions are throw after clearing the item_ids_requested_from_peer (above),
2070  // it could leave our sync in a stalled state. Wrap a try/catch around the rest
2071  // of the function so we can log if this ever happens.
2072  try
2073  {
2074  dlog( "sync: received a list of ${count} available items from ${peer_endpoint}",
2075  ( "count", blockchain_item_ids_inventory_message_received.item_hashes_available.size() )
2076  ( "peer_endpoint", originating_peer->get_remote_endpoint() ) );
2077  //for( const item_hash_t& item_hash : blockchain_item_ids_inventory_message_received.item_hashes_available )
2078  //{
2079  // dlog( "sync: ${hash}", ("hash", item_hash ) );
2080  //}
2081 
2082  // if the peer doesn't have any items after the one we asked for
2083  if( blockchain_item_ids_inventory_message_received.total_remaining_item_count == 0 &&
2084  ( blockchain_item_ids_inventory_message_received.item_hashes_available.empty() || // there are no items in the peer's blockchain. this should only happen if our blockchain was empty when we requested, might want to verify that.
2085  ( blockchain_item_ids_inventory_message_received.item_hashes_available.size() == 1 &&
2086  _delegate->has_item( item_id(blockchain_item_ids_inventory_message_received.item_type,
2087  blockchain_item_ids_inventory_message_received.item_hashes_available.front() ) ) ) ) && // we've already seen the last item in the peer's blockchain
2088  originating_peer->ids_of_items_to_get.empty() &&
2089  originating_peer->number_of_unfetched_item_ids == 0 ) // <-- is the last check necessary?
2090  {
2091  dlog( "sync: peer said we're up-to-date, entering normal operation with this peer" );
2092  originating_peer->we_need_sync_items_from_peer = false;
2093 
2094  uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers();
2095  _total_number_of_unfetched_items = new_number_of_unfetched_items;
2096  if( new_number_of_unfetched_items == 0 )
2097  _delegate->sync_status( blockchain_item_ids_inventory_message_received.item_type, 0 );
2098 
2099  return;
2100  }
2101 
2102  std::deque<item_hash_t> item_hashes_received( blockchain_item_ids_inventory_message_received.item_hashes_available.begin(),
2103  blockchain_item_ids_inventory_message_received.item_hashes_available.end() );
2104  originating_peer->number_of_unfetched_item_ids = blockchain_item_ids_inventory_message_received.total_remaining_item_count;
2105  // flush any items this peer sent us that we've already received and processed from another peer
2106  if (!item_hashes_received.empty() &&
2107  originating_peer->ids_of_items_to_get.empty())
2108  {
2109  bool is_first_item_for_other_peer = false;
2110  for (const peer_connection_ptr& peer : _active_connections)
2111  if (peer != originating_peer->shared_from_this() &&
2112  !peer->ids_of_items_to_get.empty() &&
2113  peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front())
2114  {
2115  dlog("The item ${newitem} is the first item for peer ${peer}",
2116  ("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front())
2117  ("peer", peer->get_remote_endpoint()));
2118  is_first_item_for_other_peer = true;
2119  break;
2120  }
2121  dlog("is_first_item_for_other_peer: ${is_first}. item_hashes_received.size() = ${size}",
2122  ("is_first", is_first_item_for_other_peer)("size", item_hashes_received.size()));
2123  if (!is_first_item_for_other_peer)
2124  {
2125  while (!item_hashes_received.empty() &&
2126  _delegate->has_item(item_id(blockchain_item_ids_inventory_message_received.item_type,
2127  item_hashes_received.front())))
2128  {
2129  assert(item_hashes_received.front() != item_hash_t());
2130  originating_peer->last_block_delegate_has_seen = item_hashes_received.front();
2131  originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hashes_received.front());
2132  dlog("popping item because delegate has already seen it. peer ${peer}'s last block the delegate has seen is now ${block_id} (actual block #${actual_block_num})",
2133  ("peer", originating_peer->get_remote_endpoint())
2134  ("block_id", originating_peer->last_block_delegate_has_seen)
2135  ("actual_block_num", _delegate->get_block_number(item_hashes_received.front())));
2136 
2137  item_hashes_received.pop_front();
2138  }
2139  dlog("after removing all items we have already seen, item_hashes_received.size() = ${size}", ("size", item_hashes_received.size()));
2140  }
2141  }
2142  else if (!item_hashes_received.empty())
2143  {
2144  // we received a list of items and we already have a list of items to fetch from this peer.
2145  // In the normal case, this list will immediately follow the existing list, meaning the
2146  // last hash of our existing list will match the first hash of the new list.
2147 
2148  // In the much less likely case, we've received a partial list of items from the peer, then
2149  // the peer switched forks before sending us the remaining list. In this case, the first
2150  // hash in the new list may not be the last hash in the existing list (it may be earlier, or
2151  // it may not exist at all.
2152 
2153  // In either case, pop items off the back of our existing list until we find our first
2154  // item, then append our list.
2155  while (!originating_peer->ids_of_items_to_get.empty())
2156  {
2157  if (item_hashes_received.front() != originating_peer->ids_of_items_to_get.back())
2158  originating_peer->ids_of_items_to_get.pop_back();
2159  else
2160  break;
2161  }
2162  if (originating_peer->ids_of_items_to_get.empty())
2163  {
2164  // this happens when the peer has switched forks between the last inventory message and
2165  // this one, and there weren't any unfetched items in common
2166  // We don't know where in the blockchain the new front() actually falls, all we can
2167  // expect is that it is a block that we knew about because it should be one of the
2168  // blocks we sent in the initial synopsis.
2169  assert(_delegate->has_item(item_id(_sync_item_type, item_hashes_received.front())));
2170  originating_peer->last_block_delegate_has_seen = item_hashes_received.front();
2171  originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hashes_received.front());
2172  item_hashes_received.pop_front();
2173  }
2174  else
2175  {
2176  // the common simple case: the new list extends the old. pop off the duplicate element
2177  originating_peer->ids_of_items_to_get.pop_back();
2178  }
2179  }
2180 
2181  if (!item_hashes_received.empty() && !originating_peer->ids_of_items_to_get.empty())
2182  assert(item_hashes_received.front() != originating_peer->ids_of_items_to_get.back());
2183 
2184  // at any given time, there's a maximum number of blocks that can possibly be out there
2185  // [(now - genesis time) / block interval]. If they offer us more blocks than that,
2186  // they must be an attacker or have a buggy client.
2187  fc::time_point_sec minimum_time_of_last_offered_block =
2188  originating_peer->last_block_time_delegate_has_seen + // timestamp of the block immediately before the first unfetched block
2191  if (minimum_time_of_last_offered_block > now + GRAPHENE_NET_FUTURE_SYNC_BLOCKS_GRACE_PERIOD_SEC)
2192  {
2193  wlog("Disconnecting from peer ${peer} who offered us an implausible number of blocks, their last block would be in the future (${timestamp})",
2194  ("peer", originating_peer->get_remote_endpoint())
2195  ("timestamp", minimum_time_of_last_offered_block));
2196  fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You offered me a list of more sync blocks than could possibly exist. Total blocks offered: ${blocks}, Minimum time of the last block you offered: ${minimum_time_of_last_offered_block}, Now: ${now}",
2197  ("blocks", originating_peer->number_of_unfetched_item_ids)
2198  ("minimum_time_of_last_offered_block", minimum_time_of_last_offered_block)
2199  ("now", now)));
2200  disconnect_from_peer(originating_peer,
2201  "You offered me a list of more sync blocks than could possibly exist",
2202  true, error_for_peer);
2203  return;
2204  }
2205 
2206  // append the remaining items to the peer's list
2207  boost::push_back(originating_peer->ids_of_items_to_get, item_hashes_received);
2208 
2209  uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers();
2210  if (new_number_of_unfetched_items != _total_number_of_unfetched_items)
2211  _delegate->sync_status(blockchain_item_ids_inventory_message_received.item_type,
2212  new_number_of_unfetched_items);
2213  _total_number_of_unfetched_items = new_number_of_unfetched_items;
2214 
2215  if (blockchain_item_ids_inventory_message_received.total_remaining_item_count != 0)
2216  {
2217  // the peer hasn't sent us all the items it knows about.
2218  if (originating_peer->ids_of_items_to_get.size() > GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH)
2219  {
2220  // we have a good number of item ids from this peer, start fetching blocks from it;
2221  // we'll switch back later to finish the job.
2223  }
2224  else
2225  {
2226  // keep fetching the peer's list of sync items until we get enough to switch into block-
2227  // fetchimg mode
2228  fetch_next_batch_of_item_ids_from_peer(originating_peer);
2229  }
2230  }
2231  else
2232  {
2233  // the peer has told us about all of the items it knows
2234  if (!originating_peer->ids_of_items_to_get.empty())
2235  {
2236  // we now know about all of the items the peer knows about, and there are some items on the list
2237  // that we should try to fetch. Kick off the fetch loop.
2239  }
2240  else
2241  {
2242  // If we get here, the peer has sent us a non-empty list of items, but we have already
2243  // received all of the items from other peers. Send a new request to the peer to
2244  // see if we're really in sync
2245  fetch_next_batch_of_item_ids_from_peer(originating_peer);
2246  }
2247  }
2248  }
2249  catch (const fc::canceled_exception&)
2250  {
2251  throw;
2252  }
2253  catch (const fc::exception& e)
2254  {
2255  elog("Caught unexpected exception: ${e}", ("e", e));
2256  assert(false && "exceptions not expected here");
2257  }
2258  catch (const std::exception& e)
2259  {
2260  elog("Caught unexpected exception: ${e}", ("e", e.what()));
2261  assert(false && "exceptions not expected here");
2262  }
2263  catch (...)
2264  {
2265  elog("Caught unexpected exception, could break sync operation");
2266  }
2267  }
2268  else
2269  {
2270  wlog("sync: received a list of sync items available, but I didn't ask for any!");
2271  }
2272  }
2273 
2275  {
2276  try
2277  {
2278  return _message_cache.get_message(item.item_hash);
2279  }
2280  catch (fc::key_not_found_exception&)
2281  {}
2282  try
2283  {
2284  return _delegate->get_item(item);
2285  }
2286  catch (fc::key_not_found_exception&)
2287  {}
2288  return item_not_available_message(item);
2289  }
2290 
2291  void node_impl::on_fetch_items_message(peer_connection* originating_peer, const fetch_items_message& fetch_items_message_received)
2292  {
2294  dlog("received items request for ids ${ids} of type ${type} from peer ${endpoint}",
2295  ("ids", fetch_items_message_received.items_to_fetch)
2296  ("type", fetch_items_message_received.item_type)
2297  ("endpoint", originating_peer->get_remote_endpoint()));
2298 
2299  fc::optional<message> last_block_message_sent;
2300 
2301  std::list<message> reply_messages;
2302  for (const item_hash_t& item_hash : fetch_items_message_received.items_to_fetch)
2303  {
2304  try
2305  {
2306  message requested_message = _message_cache.get_message(item_hash);
2307  dlog("received item request for item ${id} from peer ${endpoint}, returning the item from my message cache",
2308  ("endpoint", originating_peer->get_remote_endpoint())
2309  ("id", requested_message.id()));
2310  reply_messages.push_back(requested_message);
2311  if (fetch_items_message_received.item_type == block_message_type)
2312  last_block_message_sent = requested_message;
2313  continue;
2314  }
2315  catch (fc::key_not_found_exception&)
2316  {
2317  // it wasn't in our local cache, that's ok ask the client
2318  }
2319 
2320  item_id item_to_fetch(fetch_items_message_received.item_type, item_hash);
2321  try
2322  {
2323  message requested_message = _delegate->get_item(item_to_fetch);
2324  dlog("received item request from peer ${endpoint}, returning the item from delegate with id ${id} size ${size}",
2325  ("id", requested_message.id())
2326  ("size", requested_message.size)
2327  ("endpoint", originating_peer->get_remote_endpoint()));
2328  reply_messages.push_back(requested_message);
2329  if (fetch_items_message_received.item_type == block_message_type)
2330  last_block_message_sent = requested_message;
2331  continue;
2332  }
2333  catch (fc::key_not_found_exception&)
2334  {
2335  reply_messages.push_back(item_not_available_message(item_to_fetch));
2336  dlog("received item request from peer ${endpoint} but we don't have it",
2337  ("endpoint", originating_peer->get_remote_endpoint()));
2338  }
2339  }
2340 
2341  // if we sent them a block, update our record of the last block they've seen accordingly
2342  if (last_block_message_sent)
2343  {
2344  graphene::net::block_message block = last_block_message_sent->as<graphene::net::block_message>();
2345  originating_peer->last_block_delegate_has_seen = block.block_id;
2346  originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(block.block_id);
2347  }
2348 
2349  for (const message& reply : reply_messages)
2350  {
2351  if (reply.msg_type.value() == block_message_type)
2353  else
2354  originating_peer->send_message(reply);
2355  }
2356  }
2357 
2358  void node_impl::on_item_not_available_message( peer_connection* originating_peer, const item_not_available_message& item_not_available_message_received )
2359  {
2361  const item_id& requested_item = item_not_available_message_received.requested_item;
2362  auto regular_item_iter = originating_peer->items_requested_from_peer.find(requested_item);
2363  if (regular_item_iter != originating_peer->items_requested_from_peer.end())
2364  {
2365  originating_peer->items_requested_from_peer.erase( regular_item_iter );
2366  originating_peer->inventory_peer_advertised_to_us.erase( requested_item );
2367  if (is_item_in_any_peers_inventory(requested_item))
2369  wlog("Peer doesn't have the requested item.");
2371  return;
2372  }
2373 
2374  auto sync_item_iter = originating_peer->sync_items_requested_from_peer.find(requested_item.item_hash);
2375  if (sync_item_iter != originating_peer->sync_items_requested_from_peer.end())
2376  {
2377  originating_peer->sync_items_requested_from_peer.erase(sync_item_iter);
2378 
2379  if (originating_peer->peer_needs_sync_items_from_us)
2380  originating_peer->inhibit_fetching_sync_blocks = true;
2381  else
2382  disconnect_from_peer(originating_peer, "You are missing a sync item you claim to have, your database is probably corrupted. Try --rebuild-index.",true,
2383  fc::exception(FC_LOG_MESSAGE(error,"You are missing a sync item you claim to have, your database is probably corrupted. Try --rebuild-index.",
2384  ("item_id", requested_item))));
2385  wlog("Peer doesn't have the requested sync item. This really shouldn't happen");
2387  return;
2388  }
2389 
2390  dlog("Peer doesn't have an item we're looking for, which is fine because we weren't looking for it");
2391  }
2392 
2393  void node_impl::on_item_ids_inventory_message(peer_connection* originating_peer, const item_ids_inventory_message& item_ids_inventory_message_received)
2394  {
2396 
2397  // expire old inventory so we'll be making decisions our about whether to fetch blocks below based only on recent inventory
2398  originating_peer->clear_old_inventory();
2399 
2400  dlog( "received inventory of ${count} items from peer ${endpoint}",
2401  ( "count", item_ids_inventory_message_received.item_hashes_available.size() )("endpoint", originating_peer->get_remote_endpoint() ) );
2402  for( const item_hash_t& item_hash : item_ids_inventory_message_received.item_hashes_available )
2403  {
2404  item_id advertised_item_id(item_ids_inventory_message_received.item_type, item_hash);
2405  bool we_advertised_this_item_to_a_peer = false;
2406  bool we_requested_this_item_from_a_peer = false;
2407  for (const peer_connection_ptr peer : _active_connections)
2408  {
2409  if (peer->inventory_advertised_to_peer.find(advertised_item_id) != peer->inventory_advertised_to_peer.end())
2410  {
2411  we_advertised_this_item_to_a_peer = true;
2412  break;
2413  }
2414  if (peer->items_requested_from_peer.find(advertised_item_id) != peer->items_requested_from_peer.end())
2415  we_requested_this_item_from_a_peer = true;
2416  }
2417 
2418  // if we have already advertised it to a peer, we must have it, no need to do anything else
2419  if (!we_advertised_this_item_to_a_peer)
2420  {
2421  // if the peer has flooded us with transactions, don't add these to the inventory to prevent our
2422  // inventory list from growing without bound. We try to allow fetching blocks even when
2423  // we've stopped fetching transactions.
2424  if ((item_ids_inventory_message_received.item_type == graphene::net::trx_message_type &&
2426  originating_peer->is_inventory_advertised_to_us_list_full())
2427  break;
2428  originating_peer->inventory_peer_advertised_to_us.insert(peer_connection::timestamped_item_id(advertised_item_id, fc::time_point::now()));
2429  if (!we_requested_this_item_from_a_peer)
2430  {
2431  if (_recently_failed_items.find(item_id(item_ids_inventory_message_received.item_type, item_hash)) != _recently_failed_items.end())
2432  {
2433  dlog("not adding ${item_hash} to our list of items to fetch because we've recently fetched a copy and it failed to push",
2434  ("item_hash", item_hash));
2435  }
2436  else
2437  {
2438  auto items_to_fetch_iter = _items_to_fetch.get<item_id_index>().find(advertised_item_id);
2439  if (items_to_fetch_iter == _items_to_fetch.get<item_id_index>().end())
2440  {
2441  // it's new to us
2443  dlog("adding item ${item_hash} from inventory message to our list of items to fetch",
2444  ("item_hash", item_hash));
2446  }
2447  else
2448  {
2449  // another peer has told us about this item already, but this peer just told us it has the item
2450  // too, we can expect it to be around in this peer's cache for longer, so update its timestamp
2451  _items_to_fetch.get<item_id_index>().modify(items_to_fetch_iter,
2452  [](prioritized_item_id& item) { item.timestamp = fc::time_point::now(); });
2453  }
2454  }
2455  }
2456  }
2457  }
2458 
2459  }
2460 
2462  const closing_connection_message& closing_connection_message_received )
2463  {
2465  originating_peer->they_have_requested_close = true;
2466 
2467  if( closing_connection_message_received.closing_due_to_error )
2468  {
2469  wlog( "Peer ${peer} is disconnecting us because of an error: ${msg}, exception: ${error}",
2470  ( "peer", originating_peer->get_remote_endpoint() )
2471  ( "msg", closing_connection_message_received.reason_for_closing )
2472  ( "error", closing_connection_message_received.error ) );
2473  std::ostringstream message;
2474  message << "Peer " << fc::variant( originating_peer->get_remote_endpoint(),
2475  GRAPHENE_NET_MAX_NESTED_OBJECTS ).as_string() <<
2476  " disconnected us: " << closing_connection_message_received.reason_for_closing;
2477  fc::exception detailed_error(FC_LOG_MESSAGE(warn,
2478  "Peer ${peer} is disconnecting us because of an error: ${msg}, exception: ${error}",
2479  ( "peer", originating_peer->get_remote_endpoint() )
2480  ( "msg", closing_connection_message_received.reason_for_closing )
2481  ( "error", closing_connection_message_received.error ) ));
2482  _delegate->error_encountered( message.str(),
2483  detailed_error );
2484  }
2485  else
2486  {
2487  wlog( "Peer ${peer} is disconnecting us because: ${msg}",
2488  ( "peer", originating_peer->get_remote_endpoint() )
2489  ( "msg", closing_connection_message_received.reason_for_closing ) );
2490  }
2491  if( originating_peer->we_have_requested_close )
2492  originating_peer->close_connection();
2493  }
2494 
2496  {
2498  peer_connection_ptr originating_peer_ptr = originating_peer->shared_from_this();
2499  _rate_limiter.remove_tcp_socket( &originating_peer->get_socket() );
2500 
2501  // if we closed the connection (due to timeout or handshake failure), we should have recorded an
2502  // error message to store in the peer database when we closed the connection
2503  fc::optional<fc::ip::endpoint> inbound_endpoint = originating_peer->get_endpoint_for_connecting();
2504  if (originating_peer->connection_closed_error && inbound_endpoint)
2505  {
2506  fc::optional<potential_peer_record> updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*inbound_endpoint);
2507  if (updated_peer_record)
2508  {
2509  updated_peer_record->last_error = *originating_peer->connection_closed_error;
2510  _potential_peer_db.update_entry(*updated_peer_record);
2511  }
2512  }
2513 
2514  _closing_connections.erase(originating_peer_ptr);
2515  _handshaking_connections.erase(originating_peer_ptr);
2516  _terminating_connections.erase(originating_peer_ptr);
2517  if (_active_connections.find(originating_peer_ptr) != _active_connections.end())
2518  {
2519  _active_connections.erase(originating_peer_ptr);
2520 
2521  if (inbound_endpoint && originating_peer_ptr->get_remote_endpoint())
2522  {
2523  fc::optional<potential_peer_record> updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*inbound_endpoint);
2524  if (updated_peer_record)
2525  {
2526  updated_peer_record->last_seen_time = fc::time_point::now();
2527  _potential_peer_db.update_entry(*updated_peer_record);
2528  }
2529  }
2530  }
2531 
2532  ilog("Remote peer ${endpoint} closed their connection to us", ("endpoint", originating_peer->get_remote_endpoint()));
2535 
2536  // notify the node delegate so it can update the display
2538  {
2540  _delegate->connection_count_changed( _last_reported_number_of_connections );
2541  }
2542 
2543  // if we had delegated a firewall check to this peer, send it to another peer
2544  if (originating_peer->firewall_check_state)
2545  {
2546  if (originating_peer->firewall_check_state->requesting_peer != node_id_t())
2547  {
2548  // it's a check we're doing for another node
2549  firewall_check_state_data* firewall_check_state = originating_peer->firewall_check_state;
2550  originating_peer->firewall_check_state = nullptr;
2551  forward_firewall_check_to_next_available_peer(firewall_check_state);
2552  }
2553  else
2554  {
2555  // we were asking them to check whether we're firewalled. we'll just let it
2556  // go for now
2557  delete originating_peer->firewall_check_state;
2558  }
2559  }
2560 
2561  // if we had requested any sync or regular items from this peer that we haven't
2562  // received yet, reschedule them to be fetched from another peer
2563  if (!originating_peer->sync_items_requested_from_peer.empty())
2564  {
2565  for (auto sync_item : originating_peer->sync_items_requested_from_peer)
2566  _active_sync_requests.erase(sync_item);
2568  }
2569 
2570  if (!originating_peer->items_requested_from_peer.empty())
2571  {
2572  for (auto item_and_time : originating_peer->items_requested_from_peer)
2573  {
2574  if (is_item_in_any_peers_inventory(item_and_time.first))
2576  }
2578  }
2579 
2580  schedule_peer_for_deletion(originating_peer_ptr);
2581  }
2582 
2584  {
2585  dlog("in send_sync_block_to_node_delegate()");
2586  bool client_accepted_block = false;
2587  bool discontinue_fetching_blocks_from_peer = false;
2588 
2589  fc::oexception handle_message_exception;
2590 
2591  try
2592  {
2593  std::vector<fc::uint160_t> contained_transaction_message_ids;
2594  _delegate->handle_block(block_message_to_send, true, contained_transaction_message_ids);
2595  ilog("Successfully pushed sync block ${num} (id:${id})",
2596  ("num", block_message_to_send.block.block_num())
2597  ("id", block_message_to_send.block_id));
2598  _most_recent_blocks_accepted.push_back(block_message_to_send.block_id);
2599 
2600  client_accepted_block = true;
2601  }
2602  catch (const block_older_than_undo_history& e)
2603  {
2604  wlog("Failed to push sync block ${num} (id:${id}): block is on a fork older than our undo history would "
2605  "allow us to switch to: ${e}",
2606  ("num", block_message_to_send.block.block_num())
2607  ("id", block_message_to_send.block_id)
2608  ("e", (fc::exception)e));
2609  handle_message_exception = e;
2610  discontinue_fetching_blocks_from_peer = true;
2611  }
2612  catch (const fc::canceled_exception&)
2613  {
2614  throw;
2615  }
2616  catch (const fc::exception& e)
2617  {
2618  auto block_num = block_message_to_send.block.block_num();
2619  wlog("Failed to push sync block ${num} (id:${id}): client rejected sync block sent by peer: ${e}",
2620  ("num", block_num)
2621  ("id", block_message_to_send.block_id)
2622  ("e", e));
2623  if( e.code() == block_timestamp_in_future_exception::code_enum::code_value )
2624  {
2625  handle_message_exception = block_timestamp_in_future_exception( FC_LOG_MESSAGE( warn, "",
2626  ("block_header", static_cast<graphene::protocol::block_header>(block_message_to_send.block))
2627  ("block_num", block_num)
2628  ("block_id", block_message_to_send.block_id) ) );
2629  }
2630  else
2631  handle_message_exception = e;
2632  }
2633 
2634  // build up lists for any potentially-blocking operations we need to do, then do them
2635  // at the end of this function
2636  std::set<peer_connection_ptr> peers_with_newly_empty_item_lists;
2637  std::set<peer_connection_ptr> peers_we_need_to_sync_to;
2638  std::map<peer_connection_ptr, std::pair<std::string, fc::oexception> > peers_to_disconnect; // map peer -> pair<reason_string, exception>
2639 
2640  if( client_accepted_block )
2641  {
2643  dlog("sync: client accpted the block, we now have only ${count} items left to fetch before we're in sync",
2645  bool is_fork_block = is_hard_fork_block(block_message_to_send.block.block_num());
2646  for (const peer_connection_ptr& peer : _active_connections)
2647  {
2648  ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections
2649  bool disconnecting_this_peer = false;
2650  if (is_fork_block)
2651  {
2652  // we just pushed a hard fork block. Find out if this peer is running a client
2653  // that will be unable to process future blocks
2654  if (peer->last_known_fork_block_number != 0)
2655  {
2656  uint32_t next_fork_block_number = get_next_known_hard_fork_block_number(peer->last_known_fork_block_number);
2657  if (next_fork_block_number != 0 &&
2658  next_fork_block_number <= block_message_to_send.block.block_num())
2659  {
2660  std::ostringstream disconnect_reason_stream;
2661  disconnect_reason_stream << "You need to upgrade your client due to hard fork at block " << block_message_to_send.block.block_num();
2662  peers_to_disconnect[peer] = std::make_pair(disconnect_reason_stream.str(),
2663  fc::oexception(fc::exception(FC_LOG_MESSAGE(error, "You need to upgrade your client due to hard fork at block ${block_number}",
2664  ("block_number", block_message_to_send.block.block_num())))));
2665 #ifdef ENABLE_DEBUG_ULOGS
2666  ulog("Disconnecting from peer during sync because their version is too old. Their version date: ${date}", ("date", peer->graphene_git_revision_unix_timestamp));
2667 #endif
2668  disconnecting_this_peer = true;
2669  }
2670  }
2671  }
2672  if (!disconnecting_this_peer &&
2673  peer->ids_of_items_to_get.empty() && peer->ids_of_items_being_processed.empty())
2674  {
2675  dlog( "Cannot pop first element off peer ${peer}'s list, its list is empty", ("peer", peer->get_remote_endpoint() ) );
2676  // we don't know for sure that this peer has the item we just received.
2677  // If peer is still syncing to us, we know they will ask us for
2678  // sync item ids at least one more time and we'll notify them about
2679  // the item then, so there's no need to do anything. If we still need items
2680  // from them, we'll be asking them for more items at some point, and
2681  // that will clue them in that they are out of sync. If we're fully in sync
2682  // we need to kick off another round of synchronization with them so they can
2683  // find out about the new item.
2684  if (!peer->peer_needs_sync_items_from_us && !peer->we_need_sync_items_from_peer)
2685  {
2686  dlog("We will be restarting synchronization with peer ${peer}", ("peer", peer->get_remote_endpoint()));
2687  peers_we_need_to_sync_to.insert(peer);
2688  }
2689  }
2690  else if (!disconnecting_this_peer)
2691  {
2692  auto items_being_processed_iter = peer->ids_of_items_being_processed.find(block_message_to_send.block_id);
2693  if (items_being_processed_iter != peer->ids_of_items_being_processed.end())
2694  {
2695  peer->last_block_delegate_has_seen = block_message_to_send.block_id;
2696  peer->last_block_time_delegate_has_seen = block_message_to_send.block.timestamp;
2697 
2698  peer->ids_of_items_being_processed.erase(items_being_processed_iter);
2699  dlog("Removed item from ${endpoint}'s list of items being processed, still processing ${len} blocks",
2700  ("endpoint", peer->get_remote_endpoint())("len", peer->ids_of_items_being_processed.size()));
2701 
2702  // if we just received the last item in our list from this peer, we will want to
2703  // send another request to find out if we are in sync, but we can't do this yet
2704  // (we don't want to allow a fiber swap in the middle of popping items off the list)
2705  if (peer->ids_of_items_to_get.empty() &&
2706  peer->number_of_unfetched_item_ids == 0 &&
2707  peer->ids_of_items_being_processed.empty())
2708  peers_with_newly_empty_item_lists.insert(peer);
2709 
2710  // in this case, we know the peer was offering us this exact item, no need to
2711  // try to inform them of its existence
2712  }
2713  }
2714  }
2715  }
2716  else
2717  {
2718  // invalid message received
2719  for (const peer_connection_ptr& peer : _active_connections)
2720  {
2721  ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections
2722 
2723  if (peer->ids_of_items_being_processed.find(block_message_to_send.block_id)
2724  != peer->ids_of_items_being_processed.end())
2725  {
2726  if (discontinue_fetching_blocks_from_peer)
2727  {
2728  wlog("inhibiting fetching sync blocks from peer ${endpoint} because it is on a fork that's too old",
2729  ("endpoint", peer->get_remote_endpoint()));
2730  peer->inhibit_fetching_sync_blocks = true;
2731  }
2732  else
2733  peers_to_disconnect[peer] = std::make_pair(
2734  std::string("You offered us a block that we reject as invalid"),
2735  fc::oexception(handle_message_exception));
2736  }
2737  }
2738  }
2739 
2740  for (auto& peer_to_disconnect : peers_to_disconnect)
2741  {
2742  const peer_connection_ptr& peer = peer_to_disconnect.first;
2743  std::string reason_string;
2744  fc::oexception reason_exception;
2745  std::tie(reason_string, reason_exception) = peer_to_disconnect.second;
2746  wlog("disconnecting client ${endpoint} because it offered us the rejected block",
2747  ("endpoint", peer->get_remote_endpoint()));
2748  disconnect_from_peer(peer.get(), reason_string, true, reason_exception);
2749  }
2750  for (const peer_connection_ptr& peer : peers_with_newly_empty_item_lists)
2752 
2753  for (const peer_connection_ptr& peer : peers_we_need_to_sync_to)
2755 
2756  dlog("Leaving send_sync_block_to_node_delegate");
2757 
2758  if (// _suspend_fetching_sync_blocks && <-- you can use this if "maximum_number_of_blocks_to_handle_at_one_time" == "maximum_number_of_sync_blocks_to_prefetch"
2762  "process_backlog_of_sync_blocks");
2763  }
2764 
2766  {
2768  // garbage-collect the list of async tasks here for lack of a better place
2769  for (auto calls_iter = _handle_message_calls_in_progress.begin();
2770  calls_iter != _handle_message_calls_in_progress.end();)
2771  {
2772  if (calls_iter->ready())
2773  calls_iter = _handle_message_calls_in_progress.erase(calls_iter);
2774  else
2775  ++calls_iter;
2776  }
2777 
2778  dlog("in process_backlog_of_sync_blocks");
2780  {
2781  dlog("leaving process_backlog_of_sync_blocks because we're already processing too many blocks");
2782  return; // we will be rescheduled when the next block finishes its processing
2783  }
2784  dlog("currently ${count} blocks in the process of being handled", ("count", _handle_message_calls_in_progress.size()));
2785 
2786 
2788  {
2789  dlog("resuming processing sync block backlog because we only ${count} blocks in progress",
2790  ("count", _handle_message_calls_in_progress.size()));
2792  }
2793 
2794 
2795  // when syncing with multiple peers, it's possible that we'll have hundreds of blocks ready to push
2796  // to the client at once. This can be slow, and we need to limit the number we push at any given
2797  // time to allow network traffic to continue so we don't end up disconnecting from peers
2798  //fc::time_point start_time = fc::time_point::now();
2799  //fc::time_point when_we_should_yield = start_time + fc::seconds(1);
2800 
2801  bool block_processed_this_iteration;
2802  unsigned blocks_processed = 0;
2803 
2804  std::set<peer_connection_ptr> peers_with_newly_empty_item_lists;
2805  std::set<peer_connection_ptr> peers_we_need_to_sync_to;
2806  std::map<peer_connection_ptr, fc::oexception> peers_with_rejected_block;
2807 
2808  do
2809  {
2810  std::copy(std::make_move_iterator(_new_received_sync_items.begin()),
2811  std::make_move_iterator(_new_received_sync_items.end()),
2812  std::front_inserter(_received_sync_items));
2813  _new_received_sync_items.clear();
2814  dlog("currently ${count} sync items to consider", ("count", _received_sync_items.size()));
2815 
2816  block_processed_this_iteration = false;
2817  for (auto received_block_iter = _received_sync_items.begin();
2818  received_block_iter != _received_sync_items.end();
2819  ++received_block_iter)
2820  {
2821 
2822  // find out if this block is the next block on the active chain or one of the forks
2823  bool potential_first_block = false;
2824  for (const peer_connection_ptr& peer : _active_connections)
2825  {
2826  ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections
2827  if (!peer->ids_of_items_to_get.empty() &&
2828  peer->ids_of_items_to_get.front() == received_block_iter->block_id)
2829  {
2830  potential_first_block = true;
2831  peer->ids_of_items_to_get.pop_front();
2832  peer->ids_of_items_being_processed.insert(received_block_iter->block_id);
2833  }
2834  }
2835 
2836  // if it is, process it, remove it from all sync peers lists
2837  if (potential_first_block)
2838  {
2839  // we can get into an interesting situation near the end of synchronization. We can be in
2840  // sync with one peer who is sending us the last block on the chain via a regular inventory
2841  // message, while at the same time still be synchronizing with a peer who is sending us the
2842  // block through the sync mechanism. Further, we must request both blocks because
2843  // we don't know they're the same (for the peer in normal operation, it has only told us the
2844  // message id, for the peer in the sync case we only known the block_id).
2845  if (std::find(_most_recent_blocks_accepted.begin(), _most_recent_blocks_accepted.end(),
2846  received_block_iter->block_id) == _most_recent_blocks_accepted.end())
2847  {
2848  graphene::net::block_message block_message_to_process = *received_block_iter;
2849  _received_sync_items.erase(received_block_iter);
2850  _handle_message_calls_in_progress.emplace_back(fc::async([this, block_message_to_process](){
2851  send_sync_block_to_node_delegate(block_message_to_process);
2852  }, "send_sync_block_to_node_delegate"));
2853  ++blocks_processed;
2854  block_processed_this_iteration = true;
2855  }
2856  else
2857  {
2858  dlog("Already received and accepted this block (presumably through normal inventory mechanism), treating it as accepted");
2859  std::vector< peer_connection_ptr > peers_needing_next_batch;
2860  for (const peer_connection_ptr& peer : _active_connections)
2861  {
2862  auto items_being_processed_iter = peer->ids_of_items_being_processed.find(received_block_iter->block_id);
2863  if (items_being_processed_iter != peer->ids_of_items_being_processed.end())
2864  {
2865  peer->ids_of_items_being_processed.erase(items_being_processed_iter);
2866  dlog("Removed item from ${endpoint}'s list of items being processed, still processing ${len} blocks",
2867  ("endpoint", peer->get_remote_endpoint())("len", peer->ids_of_items_being_processed.size()));
2868 
2869  // if we just processed the last item in our list from this peer, we will want to
2870  // send another request to find out if we are now in sync (this is normally handled in
2871  // send_sync_block_to_node_delegate)
2872  if (peer->ids_of_items_to_get.empty() &&
2873  peer->number_of_unfetched_item_ids == 0 &&
2874  peer->ids_of_items_being_processed.empty())
2875  {
2876  dlog("We received last item in our list for peer ${endpoint}, setup to do a sync check", ("endpoint", peer->get_remote_endpoint()));
2877  peers_needing_next_batch.push_back( peer );
2878  }
2879  }
2880  }
2881  for( const peer_connection_ptr& peer : peers_needing_next_batch )
2883  }
2884 
2885  break; // start iterating _received_sync_items from the beginning
2886  } // end if potential_first_block
2887  } // end for each block in _received_sync_items
2888 
2890  {
2891  dlog("stopping processing sync block backlog because we have ${count} blocks in progress",
2892  ("count", _handle_message_calls_in_progress.size()));
2893  //ulog("stopping processing sync block backlog because we have ${count} blocks in progress, total on hand: ${received}",
2894  // ("count", _handle_message_calls_in_progress.size())("received", _received_sync_items.size()));
2897  break;
2898  }
2899  } while (block_processed_this_iteration);
2900 
2901  dlog("leaving process_backlog_of_sync_blocks, ${count} processed", ("count", blocks_processed));
2902 
2905  }
2906 
2908  {
2909  if (!_node_is_shutting_down &&
2911  _process_backlog_of_sync_blocks_done = fc::async([=](){ process_backlog_of_sync_blocks(); }, "process_backlog_of_sync_blocks");
2912  }
2913 
2915  const graphene::net::block_message& block_message_to_process, const message_hash_type& message_hash )
2916  {
2918  dlog( "received a sync block from peer ${endpoint}", ("endpoint", originating_peer->get_remote_endpoint() ) );
2919 
2920  // add it to the front of _received_sync_items, then process _received_sync_items to try to
2921  // pass as many messages as possible to the client.
2922  _new_received_sync_items.push_front( block_message_to_process );
2924  }
2925 
2927  const graphene::net::block_message& block_message_to_process,
2928  const message_hash_type& message_hash )
2929  {
2930  fc::time_point message_receive_time = fc::time_point::now();
2931 
2932  dlog( "received a block from peer ${endpoint}, passing it to client", ("endpoint", originating_peer->get_remote_endpoint() ) );
2933  std::set<peer_connection_ptr> peers_to_disconnect;
2934  std::string disconnect_reason;
2935  fc::oexception disconnect_exception;
2936  fc::oexception restart_sync_exception;
2937  try
2938  {
2939  // we can get into an intersting situation near the end of synchronization. We can be in
2940  // sync with one peer who is sending us the last block on the chain via a regular inventory
2941  // message, while at the same time still be synchronizing with a peer who is sending us the
2942  // block through the sync mechanism. Further, we must request both blocks because
2943  // we don't know they're the same (for the peer in normal operation, it has only told us the
2944  // message id, for the peer in the sync case we only known the block_id).
2945  fc::time_point message_validated_time;
2946  if (std::find(_most_recent_blocks_accepted.begin(), _most_recent_blocks_accepted.end(),
2947  block_message_to_process.block_id) == _most_recent_blocks_accepted.end())
2948  {
2949  std::vector<fc::uint160_t> contained_transaction_message_ids;
2950  _delegate->handle_block(block_message_to_process, false, contained_transaction_message_ids);
2951  message_validated_time = fc::time_point::now();
2952  ilog("Successfully pushed block ${num} (id:${id})",
2953  ("num", block_message_to_process.block.block_num())
2954  ("id", block_message_to_process.block_id));
2955  _most_recent_blocks_accepted.push_back(block_message_to_process.block_id);
2956 
2957  bool new_transaction_discovered = false;
2958  for (const item_hash_t& transaction_message_hash : contained_transaction_message_ids)
2959  {
2960  /*size_t items_erased =*/ _items_to_fetch.get<item_id_index>().erase(item_id(trx_message_type, transaction_message_hash));
2961  // there are two ways we could behave here: we could either act as if we received
2962  // the transaction outside the block and offer it to our peers, or we could just
2963  // forget about it (we would still advertise this block to our peers so they should
2964  // get the transaction through that mechanism).
2965  // We take the second approach, bring in the next if block to try the first approach
2966  //if (items_erased)
2967  //{
2968  // new_transaction_discovered = true;
2969  // _new_inventory.insert(item_id(trx_message_type, transaction_message_hash));
2970  //}
2971  }
2972  if (new_transaction_discovered)
2974  }
2975  else
2976  dlog( "Already received and accepted this block (presumably through sync mechanism), treating it as accepted" );
2977 
2978  dlog( "client validated the block, advertising it to other peers" );
2979 
2980  item_id block_message_item_id(core_message_type_enum::block_message_type, message_hash);
2981  uint32_t block_number = block_message_to_process.block.block_num();
2982  fc::time_point_sec block_time = block_message_to_process.block.timestamp;
2983 
2984  for (const peer_connection_ptr& peer : _active_connections)
2985  {
2986  ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections
2987 
2988  auto iter = peer->inventory_peer_advertised_to_us.find(block_message_item_id);
2989  if (iter != peer->inventory_peer_advertised_to_us.end())
2990  {
2991  // this peer offered us the item. It will eventually expire from the peer's
2992  // inventory_peer_advertised_to_us list after some time has passed (currently 2 minutes).
2993  // For now, it will remain there, which will prevent us from offering the peer this
2994  // block back when we rebroadcast the block below
2995  peer->last_block_delegate_has_seen = block_message_to_process.block_id;
2996  peer->last_block_time_delegate_has_seen = block_time;
2997  }
2998  peer->clear_old_inventory();
2999  }
3000  message_propagation_data propagation_data{message_receive_time, message_validated_time, originating_peer->node_id};
3001  broadcast( block_message_to_process, propagation_data );
3003 
3004  if (is_hard_fork_block(block_number))
3005  {
3006  // we just pushed a hard fork block. Find out if any of our peers are running clients
3007  // that will be unable to process future blocks
3008  for (const peer_connection_ptr& peer : _active_connections)
3009  {
3010  if (peer->last_known_fork_block_number != 0)
3011  {
3012  uint32_t next_fork_block_number = get_next_known_hard_fork_block_number(peer->last_known_fork_block_number);
3013  if (next_fork_block_number != 0 &&
3014  next_fork_block_number <= block_number)
3015  {
3016  peers_to_disconnect.insert(peer);
3017 #ifdef ENABLE_DEBUG_ULOGS
3018  ulog("Disconnecting from peer because their version is too old. Their version date: ${date}", ("date", peer->graphene_git_revision_unix_timestamp));
3019 #endif
3020  }
3021  }
3022  }
3023  if (!peers_to_disconnect.empty())
3024  {
3025  std::ostringstream disconnect_reason_stream;
3026  disconnect_reason_stream << "You need to upgrade your client due to hard fork at block " << block_number;
3027  disconnect_reason = disconnect_reason_stream.str();
3028  disconnect_exception = fc::exception(FC_LOG_MESSAGE(error, "You need to upgrade your client due to hard fork at block ${block_number}",
3029  ("block_number", block_number)));
3030  }
3031  }
3032  }
3033  catch (const fc::canceled_exception&)
3034  {
3035  throw;
3036  }
3037  catch (const unlinkable_block_exception& e)
3038  {
3039  restart_sync_exception = e;
3040  }
3041  catch (const fc::exception& e)
3042  {
3043  // client rejected the block. Disconnect the client and any other clients that offered us this block
3044  auto block_num = block_message_to_process.block.block_num();
3045  wlog("Failed to push block ${num} (id:${id}), client rejected block sent by peer: ${e}",
3046  ("num", block_num)
3047  ("id", block_message_to_process.block_id)
3048  ("e",e));
3049 
3050  if( e.code() == block_timestamp_in_future_exception::code_enum::code_value )
3051  {
3052  disconnect_exception = block_timestamp_in_future_exception( FC_LOG_MESSAGE( warn, "",
3053  ("block_header", static_cast<graphene::protocol::block_header>(block_message_to_process.block))
3054  ("block_num", block_num)
3055  ("block_id", block_message_to_process.block_id) ) );
3056  }
3057  else
3058  disconnect_exception = e;
3059  disconnect_reason = "You offered me a block that I have deemed to be invalid";
3060 
3061  peers_to_disconnect.insert( originating_peer->shared_from_this() );
3062  for (const peer_connection_ptr& peer : _active_connections)
3063  if (!peer->ids_of_items_to_get.empty() && peer->ids_of_items_to_get.front() == block_message_to_process.block_id)
3064  peers_to_disconnect.insert(peer);
3065  }
3066 
3067  if (restart_sync_exception)
3068  {
3069  wlog("Peer ${peer} sent me a block that didn't link to our blockchain. Restarting sync mode with them to get the missing block. "
3070  "Error pushing block was: ${e}",
3071  ("peer", originating_peer->get_remote_endpoint())
3072  ("e", *restart_sync_exception));
3073  start_synchronizing_with_peer(originating_peer->shared_from_this());
3074  }
3075 
3076  for (const peer_connection_ptr& peer : peers_to_disconnect)
3077  {
3078  wlog("disconnecting client ${endpoint} because it offered us the rejected block", ("endpoint", peer->get_remote_endpoint()));
3079  disconnect_from_peer(peer.get(), disconnect_reason, true, *disconnect_exception);
3080  }
3081  }
3083  const message& message_to_process,
3084  const message_hash_type& message_hash)
3085  {
3087  // find out whether we requested this item while we were synchronizing or during normal operation
3088  // (it's possible that we request an item during normal operation and then get kicked into sync
3089  // mode before we receive and process the item. In that case, we should process the item as a normal
3090  // item to avoid confusing the sync code)
3091  graphene::net::block_message block_message_to_process(message_to_process.as<graphene::net::block_message>());
3092  auto item_iter = originating_peer->items_requested_from_peer.find(item_id(graphene::net::block_message_type, message_hash));
3093  if (item_iter != originating_peer->items_requested_from_peer.end())
3094  {
3095  originating_peer->items_requested_from_peer.erase(item_iter);
3096  process_block_during_normal_operation(originating_peer, block_message_to_process, message_hash);
3097  if (originating_peer->idle())
3099  return;
3100  }
3101  else
3102  {
3103  // not during normal operation. see if we requested it during sync
3104  auto sync_item_iter = originating_peer->sync_items_requested_from_peer.find( block_message_to_process.block_id);
3105  if (sync_item_iter != originating_peer->sync_items_requested_from_peer.end())
3106  {
3107  originating_peer->sync_items_requested_from_peer.erase(sync_item_iter);
3108  // if exceptions are throw here after removing the sync item from the list (above),
3109  // it could leave our sync in a stalled state. Wrap a try/catch around the rest
3110  // of the function so we can log if this ever happens.
3111  try
3112  {
3113  originating_peer->last_sync_item_received_time = fc::time_point::now();
3114  _active_sync_requests.erase(block_message_to_process.block_id);
3115  process_block_during_sync(originating_peer, block_message_to_process, message_hash);
3116  if (originating_peer->idle())
3117  {
3118  // we have finished fetching a batch of items, so we either need to grab another batch of items
3119  // or we need to get another list of item ids.
3120  if (originating_peer->number_of_unfetched_item_ids > 0 &&
3122  fetch_next_batch_of_item_ids_from_peer(originating_peer);
3123  else
3125  }
3126  return;
3127  }
3128  catch (const fc::canceled_exception& e)
3129  {
3130  throw;
3131  }
3132  catch (const fc::exception& e)
3133  {
3134  elog("Caught unexpected exception: ${e}", ("e", e));
3135  assert(false && "exceptions not expected here");
3136  }
3137  catch (const std::exception& e)
3138  {
3139  elog("Caught unexpected exception: ${e}", ("e", e.what()));
3140  assert(false && "exceptions not expected here");
3141  }
3142  catch (...)
3143  {
3144  elog("Caught unexpected exception, could break sync operation");
3145  }
3146  }
3147  }
3148 
3149  // if we get here, we didn't request the message, we must have a misbehaving peer
3150  wlog("received a block ${block_id} I didn't ask for from peer ${endpoint}, disconnecting from peer",
3151  ("endpoint", originating_peer->get_remote_endpoint())
3152  ("block_id", block_message_to_process.block_id));
3153  fc::exception detailed_error(FC_LOG_MESSAGE(error, "You sent me a block that I didn't ask for, block_id: ${block_id}",
3154  ("block_id", block_message_to_process.block_id)
3155  ("graphene_git_revision_sha", originating_peer->graphene_git_revision_sha)
3156  ("graphene_git_revision_unix_timestamp", originating_peer->graphene_git_revision_unix_timestamp)
3157  ("fc_git_revision_sha", originating_peer->fc_git_revision_sha)
3158  ("fc_git_revision_unix_timestamp", originating_peer->fc_git_revision_unix_timestamp)));
3159  disconnect_from_peer(originating_peer, "You sent me a block that I didn't ask for", true, detailed_error);
3160  }
3161 
3163  const current_time_request_message& current_time_request_message_received)
3164  {
3166  fc::time_point request_received_time(fc::time_point::now());
3167  current_time_reply_message reply(current_time_request_message_received.request_sent_time,
3168  request_received_time);
3169  originating_peer->send_message(reply, offsetof(current_time_reply_message, reply_transmitted_time));
3170  }
3171 
3173  const current_time_reply_message& current_time_reply_message_received)
3174  {
3176  fc::time_point reply_received_time = fc::time_point::now();
3177  originating_peer->clock_offset = fc::microseconds(((current_time_reply_message_received.request_received_time - current_time_reply_message_received.request_sent_time) +
3178  (current_time_reply_message_received.reply_transmitted_time - reply_received_time)).count() / 2);
3179  originating_peer->round_trip_delay = (reply_received_time - current_time_reply_message_received.request_sent_time) -
3180  (current_time_reply_message_received.reply_transmitted_time - current_time_reply_message_received.request_received_time);
3181  }
3182 
3184  {
3185  for (const peer_connection_ptr& peer : _active_connections)
3186  {
3187  if (firewall_check_state->expected_node_id != peer->node_id && // it's not the node who is asking us to test
3188  !peer->firewall_check_state && // the peer isn't already performing a check for another node
3189  firewall_check_state->nodes_already_tested.find(peer->node_id) == firewall_check_state->nodes_already_tested.end() &&
3190  peer->core_protocol_version >= 106)
3191  {
3192  wlog("forwarding firewall check for node ${to_check} to peer ${checker}",
3193  ("to_check", firewall_check_state->endpoint_to_test)
3194  ("checker", peer->get_remote_endpoint()));
3195  firewall_check_state->nodes_already_tested.insert(peer->node_id);
3196  peer->firewall_check_state = firewall_check_state;
3197  check_firewall_message check_request;
3198  check_request.endpoint_to_check = firewall_check_state->endpoint_to_test;
3199  check_request.node_id = firewall_check_state->expected_node_id;
3200  peer->send_message(check_request);
3201  return;
3202  }
3203  }
3204  wlog("Unable to forward firewall check for node ${to_check} to any other peers, returning 'unable'",
3205  ("to_check", firewall_check_state->endpoint_to_test));
3206 
3207  peer_connection_ptr originating_peer = get_peer_by_node_id(firewall_check_state->expected_node_id);
3208  if (originating_peer)
3209  {
3211  reply.node_id = firewall_check_state->expected_node_id;
3212  reply.endpoint_checked = firewall_check_state->endpoint_to_test;
3214  originating_peer->send_message(reply);
3215  }
3216  delete firewall_check_state;
3217  }
3218 
3220  const check_firewall_message& check_firewall_message_received)
3221  {
3223 
3224  if (check_firewall_message_received.node_id == node_id_t() &&
3225  check_firewall_message_received.endpoint_to_check == fc::ip::endpoint())
3226  {
3227  // originating_peer is asking us to test whether it is firewalled
3228  // we're not going to try to connect back to the originating peer directly,
3229  // instead, we're going to coordinate requests by asking some of our peers
3230  // to try to connect to the originating peer, and relay the results back
3231  wlog("Peer ${peer} wants us to check whether it is firewalled", ("peer", originating_peer->get_remote_endpoint()));
3232  firewall_check_state_data* firewall_check_state = new firewall_check_state_data;
3233  // if they are using the same inbound and outbound port, try connecting to their outbound endpoint.
3234  // if they are using a different inbound port, use their outbound address but the inbound port they reported
3235  fc::ip::endpoint endpoint_to_check = originating_peer->get_socket().remote_endpoint();
3236  if (originating_peer->inbound_port != originating_peer->outbound_port)
3237  endpoint_to_check = fc::ip::endpoint(endpoint_to_check.get_address(), originating_peer->inbound_port);
3238  firewall_check_state->endpoint_to_test = endpoint_to_check;
3239  firewall_check_state->expected_node_id = originating_peer->node_id;
3240  firewall_check_state->requesting_peer = originating_peer->node_id;
3241 
3242  forward_firewall_check_to_next_available_peer(firewall_check_state);
3243  }
3244  else
3245  {
3246  // we're being asked to check another node
3247  // first, find out if we're currently connected to that node. If we are, we
3248  // can't perform the test
3249  if (is_already_connected_to_id(check_firewall_message_received.node_id) ||
3250  is_connection_to_endpoint_in_progress(check_firewall_message_received.endpoint_to_check))
3251  {
3253  reply.node_id = check_firewall_message_received.node_id;
3254  reply.endpoint_checked = check_firewall_message_received.endpoint_to_check;
3256  }
3257  else
3258  {
3259  // we're not connected to them, so we need to set up a connection to them
3260  // to test.
3261  peer_connection_ptr peer_for_testing(peer_connection::make_shared(this));
3262  peer_for_testing->firewall_check_state = new firewall_check_state_data;
3263  peer_for_testing->firewall_check_state->endpoint_to_test = check_firewall_message_received.endpoint_to_check;
3264  peer_for_testing->firewall_check_state->expected_node_id = check_firewall_message_received.node_id;
3265  peer_for_testing->firewall_check_state->requesting_peer = originating_peer->node_id;
3266  peer_for_testing->set_remote_endpoint(check_firewall_message_received.endpoint_to_check);
3267  initiate_connect_to(peer_for_testing);
3268  }
3269  }
3270  }
3271 
3273  const check_firewall_reply_message& check_firewall_reply_message_received)
3274  {
3276 
3277  if (originating_peer->firewall_check_state &&
3278  originating_peer->firewall_check_state->requesting_peer != node_id_t())
3279  {
3280  // then this is a peer that is helping us check the firewalled state of one of our other peers
3281  // and they're reporting back
3282  // if they got a result, return it to the original peer. if they were unable to check,
3283  // we'll try another peer.
3284  wlog("Peer ${reporter} reports firewall check status ${status} for ${peer}",
3285  ("reporter", originating_peer->get_remote_endpoint())
3286  ("status", check_firewall_reply_message_received.result)
3287  ("peer", check_firewall_reply_message_received.endpoint_checked));
3288 
3289  if (check_firewall_reply_message_received.result == firewall_check_result::unable_to_connect ||
3290  check_firewall_reply_message_received.result == firewall_check_result::connection_successful)
3291  {
3293  if (original_peer)
3294  {
3295  if (check_firewall_reply_message_received.result == firewall_check_result::connection_successful)
3296  {
3297  // if we previously thought this peer was firewalled, mark them as not firewalled
3298  if (original_peer->is_firewalled != firewalled_state::not_firewalled)
3299  {
3300 
3301  original_peer->is_firewalled = firewalled_state::not_firewalled;
3302  // there should be no old entry if we thought they were firewalled, so just create a new one
3303  fc::optional<fc::ip::endpoint> inbound_endpoint = originating_peer->get_endpoint_for_connecting();
3304  if (inbound_endpoint)
3305  {
3306  potential_peer_record updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_endpoint(*inbound_endpoint);
3307  updated_peer_record.last_seen_time = fc::time_point::now();
3308  _potential_peer_db.update_entry(updated_peer_record);
3309  }
3310  }
3311  }
3312  original_peer->send_message(check_firewall_reply_message_received);
3313  }
3314  delete originating_peer->firewall_check_state;
3315  originating_peer->firewall_check_state = nullptr;
3316  }
3317  else
3318  {
3319  // they were unable to check for us, ask another peer
3320  firewall_check_state_data* firewall_check_state = originating_peer->firewall_check_state;
3321  originating_peer->firewall_check_state = nullptr;
3322  forward_firewall_check_to_next_available_peer(firewall_check_state);
3323  }
3324  }
3325  else if (originating_peer->firewall_check_state)
3326  {
3327  // this is a reply to a firewall check we initiated.
3328  wlog("Firewall check we initiated has returned with result: ${result}, endpoint = ${endpoint}",
3329  ("result", check_firewall_reply_message_received.result)
3330  ("endpoint", check_firewall_reply_message_received.endpoint_checked));
3331  if (check_firewall_reply_message_received.result == firewall_check_result::connection_successful)
3332  {
3334  _publicly_visible_listening_endpoint = check_firewall_reply_message_received.endpoint_checked;
3335  }
3336  else if (check_firewall_reply_message_received.result == firewall_check_result::unable_to_connect)
3337  {
3340  }
3341  delete originating_peer->firewall_check_state;
3342  originating_peer->firewall_check_state = nullptr;
3343  }
3344  else
3345  {
3346  wlog("Received a firewall check reply to a request I never sent");
3347  }
3348 
3349  }
3350 
3352  const get_current_connections_request_message& get_current_connections_request_message_received)
3353  {
3356 
3358  {
3361 
3362  size_t minutes_to_average = std::min(_average_network_write_speed_minutes.size(), (size_t)15);
3363  boost::circular_buffer<uint32_t>::iterator start_iter = _average_network_write_speed_minutes.end() - minutes_to_average;
3364  reply.upload_rate_fifteen_minutes = std::accumulate(start_iter, _average_network_write_speed_minutes.end(), 0) / (uint32_t)minutes_to_average;
3365  start_iter = _average_network_read_speed_minutes.end() - minutes_to_average;
3366  reply.download_rate_fifteen_minutes = std::accumulate(start_iter, _average_network_read_speed_minutes.end(), 0) / (uint32_t)minutes_to_average;
3367 
3368  minutes_to_average = std::min(_average_network_write_speed_minutes.size(), (size_t)60);
3369  start_iter = _average_network_write_speed_minutes.end() - minutes_to_average;
3370  reply.upload_rate_one_hour = std::accumulate(start_iter, _average_network_write_speed_minutes.end(), 0) / (uint32_t)minutes_to_average;
3371  start_iter = _average_network_read_speed_minutes.end() - minutes_to_average;
3372  reply.download_rate_one_hour = std::accumulate(start_iter, _average_network_read_speed_minutes.end(), 0) / (uint32_t)minutes_to_average;
3373  }
3374 
3376  for (const peer_connection_ptr& peer : _active_connections)
3377  {
3378  ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections
3379 
3380  current_connection_data data_for_this_peer;
3381  data_for_this_peer.connection_duration = now.sec_since_epoch() - peer->connection_initiation_time.sec_since_epoch();
3382  if (peer->get_remote_endpoint()) // should always be set for anyone we're actively connected to
3383  data_for_this_peer.remote_endpoint = *peer->get_remote_endpoint();
3384  data_for_this_peer.clock_offset = peer->clock_offset;
3385  data_for_this_peer.round_trip_delay = peer->round_trip_delay;
3386  data_for_this_peer.node_id = peer->node_id;
3387  data_for_this_peer.connection_direction = peer->direction;
3388  data_for_this_peer.firewalled = peer->is_firewalled;
3389  fc::mutable_variant_object user_data;
3390  if (peer->graphene_git_revision_sha)
3391  user_data["graphene_git_revision_sha"] = *peer->graphene_git_revision_sha;
3392  if (peer->graphene_git_revision_unix_timestamp)
3393  user_data["graphene_git_revision_unix_timestamp"] = *peer->graphene_git_revision_unix_timestamp;
3394  if (peer->fc_git_revision_sha)
3395  user_data["fc_git_revision_sha"] = *peer->fc_git_revision_sha;
3396  if (peer->fc_git_revision_unix_timestamp)
3397  user_data["fc_git_revision_unix_timestamp"] = *peer->fc_git_revision_unix_timestamp;
3398  if (peer->platform)
3399  user_data["platform"] = *peer->platform;
3400  if (peer->bitness)
3401  user_data["bitness"] = *peer->bitness;
3402  user_data["user_agent"] = peer->user_agent;
3403 
3404  user_data["last_known_block_hash"] = fc::variant( peer->last_block_delegate_has_seen, 1 );
3405  user_data["last_known_block_number"] = _delegate->get_block_number(peer->last_block_delegate_has_seen);
3406  user_data["last_known_block_time"] = peer->last_block_time_delegate_has_seen;
3407 
3408  data_for_this_peer.user_data = user_data;
3409  reply.current_connections.emplace_back(data_for_this_peer);
3410  }
3411  originating_peer->send_message(reply);
3412  }
3413 
3415  const get_current_connections_reply_message& get_current_connections_reply_message_received)
3416  {
3418  }
3419 
3420 
3421  // this handles any message we get that doesn't require any special processing.
3422  // currently, this is any message other than block messages and p2p-specific
3423  // messages. (transaction messages would be handled here, for example)
3424  // this just passes the message to the client, and does the bookkeeping
3425  // related to requesting and rebroadcasting the message.
3427  const message& message_to_process,
3428  const message_hash_type& message_hash )
3429  {
3431  fc::time_point message_receive_time = fc::time_point::now();
3432 
3433  // only process it if we asked for it
3434  auto iter = originating_peer->items_requested_from_peer.find(
3435  item_id(message_to_process.msg_type.value(), message_hash) );
3436  if( iter == originating_peer->items_requested_from_peer.end() )
3437  {
3438  wlog( "received a message I didn't ask for from peer ${endpoint}, disconnecting from peer",
3439  ( "endpoint", originating_peer->get_remote_endpoint() ) );
3440  fc::exception detailed_error( FC_LOG_MESSAGE(error,
3441  "You sent me a message that I didn't ask for, message_hash: ${message_hash}",
3442  ( "message_hash", message_hash ) ) );
3443  disconnect_from_peer( originating_peer, "You sent me a message that I didn't request", true, detailed_error );
3444  return;
3445  }
3446  else
3447  {
3448  originating_peer->items_requested_from_peer.erase( iter );
3449  if (originating_peer->idle())
3451 
3452  // Next: have the delegate process the message
3453  fc::time_point message_validated_time;
3454  try
3455  {
3456  if (message_to_process.msg_type.value() == trx_message_type)
3457  {
3458  trx_message transaction_message_to_process = message_to_process.as<trx_message>();
3459  dlog( "passing message containing transaction ${trx} to client",
3460  ("trx", transaction_message_to_process.trx.id()) );
3461  _delegate->handle_transaction(transaction_message_to_process);
3462  }
3463  else
3464  _delegate->handle_message( message_to_process );
3465  message_validated_time = fc::time_point::now();
3466  }
3467  catch ( const fc::canceled_exception& )
3468  {
3469  throw;
3470  }
3471  catch ( const fc::exception& e )
3472  {
3473  switch( e.code() )
3474  {
3475  // log common exceptions in debug level
3476  case graphene::chain::duplicate_transaction::code_enum::code_value :
3477  case graphene::chain::limit_order_create_kill_unfilled::code_enum::code_value :
3478  case graphene::chain::limit_order_create_market_not_whitelisted::code_enum::code_value :
3479  case graphene::chain::limit_order_create_market_blacklisted::code_enum::code_value :
3480  case graphene::chain::limit_order_create_selling_asset_unauthorized::code_enum::code_value :
3481  case graphene::chain::limit_order_create_receiving_asset_unauthorized::code_enum::code_value :
3482  case graphene::chain::limit_order_create_insufficient_balance::code_enum::code_value :
3483  case graphene::chain::limit_order_cancel_nonexist_order::code_enum::code_value :
3484  case graphene::chain::limit_order_cancel_owner_mismatch::code_enum::code_value :
3485  dlog( "client rejected message sent by peer ${peer}, ${e}",
3486  ("peer", originating_peer->get_remote_endpoint() )("e", e) );
3487  break;
3488  // log rarer exceptions in warn level
3489  default:
3490  wlog( "client rejected message sent by peer ${peer}, ${e}",
3491  ("peer", originating_peer->get_remote_endpoint() )("e", e) );
3492  break;
3493  }
3494  // record it so we don't try to fetch this item again
3496  item_id( message_to_process.msg_type.value(), message_hash ), fc::time_point::now() ) );
3497  return;
3498  }
3499 
3500  // finally, if the delegate validated the message, broadcast it to our other peers
3501  message_propagation_data propagation_data { message_receive_time, message_validated_time,
3502  originating_peer->node_id };
3503  broadcast( message_to_process, propagation_data );
3504  }
3505  }
3506 
3508  {
3510  peer->ids_of_items_to_get.clear();
3511  peer->number_of_unfetched_item_ids = 0;
3512  peer->we_need_sync_items_from_peer = true;
3513  peer->last_block_delegate_has_seen = item_hash_t();
3514  peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hash_t());
3515  peer->inhibit_fetching_sync_blocks = false;
3517  }
3518 
3520  {
3521  for( const peer_connection_ptr& peer : _active_connections )
3523  }
3524 
3526  {
3528  peer->send_message(current_time_request_message(),
3529  offsetof(current_time_request_message, request_sent_time));
3532  {
3534  _delegate->connection_count_changed( _last_reported_number_of_connections );
3535  }
3536  }
3537 
3539  {
3541 
3542  try
3543  {
3545  }
3546  catch ( const fc::exception& e )
3547  {
3548  wlog( "Exception thrown while closing P2P peer database, ignoring: ${e}", ("e", e) );
3549  }
3550  catch (...)
3551  {
3552  wlog( "Exception thrown while closing P2P peer database, ignoring" );
3553  }
3554 
3555  // First, stop accepting incoming network connections
3556  try
3557  {
3558  _tcp_server.close();
3559  dlog("P2P TCP server closed");
3560  }
3561  catch ( const fc::exception& e )
3562  {
3563  wlog( "Exception thrown while closing P2P TCP server, ignoring: ${e}", ("e", e) );
3564  }
3565  catch (...)
3566  {
3567  wlog( "Exception thrown while closing P2P TCP server, ignoring" );
3568  }
3569 
3570  try
3571  {
3572  _accept_loop_complete.cancel_and_wait("node_impl::close()");
3573  dlog("P2P accept loop terminated");
3574  }
3575  catch ( const fc::exception& e )
3576  {
3577  wlog( "Exception thrown while terminating P2P accept loop, ignoring: ${e}", ("e", e) );
3578  }
3579  catch (...)
3580  {
3581  wlog( "Exception thrown while terminating P2P accept loop, ignoring" );
3582  }
3583 
3584  // terminate all of our long-running loops (these run continuously instead of rescheduling themselves)
3585  try
3586  {
3587  _p2p_network_connect_loop_done.cancel("node_impl::close()");
3588  // cancel() is currently broken, so we need to wake up the task to allow it to finish
3591  dlog("P2P connect loop terminated");
3592  }
3593  catch ( const fc::canceled_exception& )
3594  {
3595  dlog("P2P connect loop terminated");
3596  }
3597  catch ( const fc::exception& e )
3598  {
3599  wlog( "Exception thrown while terminating P2P connect loop, ignoring: ${e}", ("e", e) );
3600  }
3601  catch (...)
3602  {
3603  wlog( "Exception thrown while terminating P2P connect loop, ignoring" );
3604  }
3605 
3606  try
3607  {
3609  dlog("Process backlog of sync items task terminated");
3610  }
3611  catch ( const fc::canceled_exception& )
3612  {
3613  dlog("Process backlog of sync items task terminated");
3614  }
3615  catch ( const fc::exception& e )
3616  {
3617  wlog( "Exception thrown while terminating Process backlog of sync items task, ignoring: ${e}", ("e", e) );
3618  }
3619  catch (...)
3620  {
3621  wlog( "Exception thrown while terminating Process backlog of sync items task, ignoring" );
3622  }
3623 
3624  unsigned handle_message_call_count = 0;
3625  while( true )
3626  {
3627  auto it = _handle_message_calls_in_progress.begin();
3628  if( it == _handle_message_calls_in_progress.end() )
3629  break;
3630  if( it->ready() || it->error() || it->canceled() )
3631  {
3633  continue;
3634  }
3635  ++handle_message_call_count;
3636  try
3637  {
3638  it->cancel_and_wait("node_impl::close()");
3639  dlog("handle_message call #${count} task terminated", ("count", handle_message_call_count));
3640  }
3641  catch ( const fc::canceled_exception& )
3642  {
3643  dlog("handle_message call #${count} task terminated", ("count", handle_message_call_count));
3644  }
3645  catch ( const fc::exception& e )
3646  {
3647  wlog("Exception thrown while terminating handle_message call #${count} task, ignoring: ${e}", ("e", e)("count", handle_message_call_count));
3648  }
3649  catch (...)
3650  {
3651  wlog("Exception thrown while terminating handle_message call #${count} task, ignoring",("count", handle_message_call_count));
3652  }
3653  }
3654 
3655  try
3656  {
3657  _fetch_sync_items_loop_done.cancel("node_impl::close()");
3658  // cancel() is currently broken, so we need to wake up the task to allow it to finish
3661  dlog("Fetch sync items loop terminated");
3662  }
3663  catch ( const fc::canceled_exception& )
3664  {
3665  dlog("Fetch sync items loop terminated");
3666  }
3667  catch ( const fc::exception& e )
3668  {
3669  wlog( "Exception thrown while terminating Fetch sync items loop, ignoring: ${e}", ("e", e) );
3670  }
3671  catch (...)
3672  {
3673  wlog( "Exception thrown while terminating Fetch sync items loop, ignoring" );
3674  }
3675 
3676  try
3677  {
3678  _fetch_item_loop_done.cancel("node_impl::close()");
3679  // cancel() is currently broken, so we need to wake up the task to allow it to finish
3682  dlog("Fetch items loop terminated");
3683  }
3684  catch ( const fc::canceled_exception& )
3685  {
3686  dlog("Fetch items loop terminated");
3687  }
3688  catch ( const fc::exception& e )
3689  {
3690  wlog( "Exception thrown while terminating Fetch items loop, ignoring: ${e}", ("e", e) );
3691  }
3692  catch (...)
3693  {
3694  wlog( "Exception thrown while terminating Fetch items loop, ignoring" );
3695  }
3696 
3697  try
3698  {
3699  _advertise_inventory_loop_done.cancel("node_impl::close()");
3700  // cancel() is currently broken, so we need to wake up the task to allow it to finish
3703  dlog("Advertise inventory loop terminated");
3704  }
3705  catch ( const fc::canceled_exception& )
3706  {
3707  dlog("Advertise inventory loop terminated");
3708  }
3709  catch ( const fc::exception& e )
3710  {
3711  wlog( "Exception thrown while terminating Advertise inventory loop, ignoring: ${e}", ("e", e) );
3712  }
3713  catch (...)
3714  {
3715  wlog( "Exception thrown while terminating Advertise inventory loop, ignoring" );
3716  }
3717 
3718 
3719  // Next, terminate our existing connections. First, close all of the connections nicely.
3720  // This will close the sockets and may result in calls to our "on_connection_closing"
3721  // method to inform us that the connection really closed (or may not if we manage to cancel
3722  // the read loop before it gets an EOF).
3723  // operate off copies of the lists in case they change during iteration
3724  std::list<peer_connection_ptr> all_peers;
3725  boost::push_back(all_peers, _active_connections);
3726  boost::push_back(all_peers, _handshaking_connections);
3727  boost::push_back(all_peers, _closing_connections);
3728 
3729  for (const peer_connection_ptr& peer : all_peers)
3730  {
3731  try
3732  {
3733  peer->destroy_connection();
3734  }
3735  catch ( const fc::exception& e )
3736  {
3737  wlog( "Exception thrown while closing peer connection, ignoring: ${e}", ("e", e) );
3738  }
3739  catch (...)
3740  {
3741  wlog( "Exception thrown while closing peer connection, ignoring" );
3742  }
3743  }
3744 
3745  // and delete all of the peer_connection objects
3746  _active_connections.clear();
3747  _handshaking_connections.clear();
3748  _closing_connections.clear();
3749  all_peers.clear();
3750 
3751  {
3752 #ifdef USE_PEERS_TO_DELETE_MUTEX
3753  fc::scoped_lock<fc::mutex> lock(_peers_to_delete_mutex);
3754 #endif
3755  try
3756  {
3757  _delayed_peer_deletion_task_done.cancel_and_wait("node_impl::close()");
3758  dlog("Delayed peer deletion task terminated");
3759  }
3760  catch ( const fc::exception& e )
3761  {
3762  wlog( "Exception thrown while terminating Delayed peer deletion task, ignoring: ${e}", ("e", e) );
3763  }
3764  catch (...)
3765  {
3766  wlog( "Exception thrown while terminating Delayed peer deletion task, ignoring" );
3767  }
3768  _peers_to_delete.clear();
3769  }
3770 
3771  // Now that there are no more peers that can call methods on us, there should be no
3772  // chance for one of our loops to be rescheduled, so we can safely terminate all of
3773  // our loops now
3774  try
3775  {
3777  dlog("Terminate inactive connections loop terminated");
3778  }
3779  catch ( const fc::exception& e )
3780  {
3781  wlog( "Exception thrown while terminating Terminate inactive connections loop, ignoring: ${e}", ("e", e) );
3782  }
3783  catch (...)
3784  {
3785  wlog( "Exception thrown while terminating Terminate inactive connections loop, ignoring" );
3786  }
3787 
3788  try
3789  {
3791  dlog("Fetch updated peer lists loop terminated");
3792  }
3793  catch ( const fc::exception& e )
3794  {
3795  wlog( "Exception thrown while terminating Fetch updated peer lists loop, ignoring: ${e}", ("e", e) );
3796  }
3797  catch (...)
3798  {
3799  wlog( "Exception thrown while terminating Fetch updated peer lists loop, ignoring" );
3800  }
3801 
3802  try
3803  {
3804  _update_seed_nodes_loop_done.cancel_and_wait("node_impl::close()");
3805  dlog("Update seed nodes loop terminated");
3806  }
3807  catch ( const fc::exception& e )
3808  {
3809  wlog( "Exception thrown while terminating Update seed nodes loop, ignoring: ${e}", ("e", e) );
3810  }
3811  catch (...)
3812  {
3813  wlog( "Exception thrown while terminating Update seed nodes loop, ignoring" );
3814  }
3815 
3816  try
3817  {
3818  _bandwidth_monitor_loop_done.cancel_and_wait("node_impl::close()");
3819  dlog("Bandwidth monitor loop terminated");
3820  }
3821  catch ( const fc::exception& e )
3822  {
3823  wlog( "Exception thrown while terminating Bandwidth monitor loop, ignoring: ${e}", ("e", e) );
3824  }
3825  catch (...)
3826  {
3827  wlog( "Exception thrown while terminating Bandwidth monitor loop, ignoring" );
3828  }
3829 
3830  try
3831  {
3832  _dump_node_status_task_done.cancel_and_wait("node_impl::close()");
3833  dlog("Dump node status task terminated");
3834  }
3835  catch ( const fc::exception& e )
3836  {
3837  wlog( "Exception thrown while terminating Dump node status task, ignoring: ${e}", ("e", e) );
3838  }
3839  catch (...)
3840  {
3841  wlog( "Exception thrown while terminating Dump node status task, ignoring" );
3842  }
3843  } // node_impl::close()
3844 
3846  {
3848  new_peer->accept_connection(); // this blocks until the secure connection is fully negotiated
3849  send_hello_message(new_peer);
3850  }
3851 
3853  {
3855  while ( !_accept_loop_complete.canceled() )
3856  {
3858 
3859  try
3860  {
3861  _tcp_server.accept( new_peer->get_socket() );
3862  ilog( "accepted inbound connection from ${remote_endpoint}", ("remote_endpoint", new_peer->get_socket().remote_endpoint() ) );
3864  return;
3865  new_peer->connection_initiation_time = fc::time_point::now();
3866  _handshaking_connections.insert( new_peer );
3867  _rate_limiter.add_tcp_socket( &new_peer->get_socket() );
3868  std::weak_ptr<peer_connection> new_weak_peer(new_peer);
3869  new_peer->accept_or_connect_task_done = fc::async( [this, new_weak_peer]() {
3870  peer_connection_ptr new_peer(new_weak_peer.lock());
3871  assert(new_peer);
3872  if (!new_peer)
3873  return;
3874  accept_connection_task(new_peer);
3875  }, "accept_connection_task" );
3876 
3877  // limit the rate at which we accept connections to mitigate DOS attacks
3879  } FC_CAPTURE_AND_LOG( (0) )
3880  }
3881  } // accept_loop()
3882 
3884  {
3887 
3888  fc::sha256::encoder shared_secret_encoder;
3889  fc::sha512 shared_secret = peer->get_shared_secret();
3890  shared_secret_encoder.write(shared_secret.data(), sizeof(shared_secret));
3892 
3893  // in the hello messsage, we send three things:
3894  // ip address
3895  // outbound port
3896  // inbound port
3897  // The peer we're connecting to will assume we're firewalled if the
3898  // ip address and outbound port we send don't match the values it sees on its remote endpoint
3899  //
3900  // if we know that we're behind a NAT that will allow incoming connections because our firewall
3901  // detection figured it out, send those values instead.
3902 
3903  fc::ip::endpoint local_endpoint(peer->get_socket().local_endpoint());
3905 
3908  {
3909  local_endpoint = *_publicly_visible_listening_endpoint;
3910  listening_port = _publicly_visible_listening_endpoint->port();
3911  }
3912 
3915  local_endpoint.get_address(),
3916  listening_port,
3917  local_endpoint.port(),
3919  signature,
3920  _chain_id,
3922 
3923  peer->send_message(message(hello));
3924  }
3925 
3927  const fc::ip::endpoint& remote_endpoint)
3928  {
3930 
3931  if (!new_peer->performing_firewall_check())
3932  {
3933  // create or find the database entry for the new peer
3934  // if we're connecting to them, we believe they're not firewalled
3935  potential_peer_record updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_endpoint(remote_endpoint);
3937  updated_peer_record.last_connection_attempt_time = fc::time_point::now();;
3938  _potential_peer_db.update_entry(updated_peer_record);
3939  }
3940  else
3941  {
3942  wlog("connecting to peer ${peer} for firewall check", ("peer", new_peer->get_remote_endpoint()));
3943  }
3944 
3945  fc::oexception connect_failed_exception;
3946 
3947  try
3948  {
3949  new_peer->connect_to(remote_endpoint, _actual_listening_endpoint); // blocks until the connection is established and secure connection is negotiated
3950 
3951  // we connected to the peer. guess they're not firewalled....
3952  new_peer->is_firewalled = firewalled_state::not_firewalled;
3953 
3954  // connection succeeded, we've started handshaking. record that in our database
3955  potential_peer_record updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_endpoint(remote_endpoint);
3957  updated_peer_record.number_of_successful_connection_attempts++;
3958  updated_peer_record.last_seen_time = fc::time_point::now();
3959  _potential_peer_db.update_entry(updated_peer_record);
3960  }
3961  catch (const fc::exception& except)
3962  {
3963  connect_failed_exception = except;
3964  }
3965 
3966  if (connect_failed_exception && !new_peer->performing_firewall_check())
3967  {
3968  // connection failed. record that in our database
3969  potential_peer_record updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_endpoint(remote_endpoint);
3971  updated_peer_record.number_of_failed_connection_attempts++;
3972  if (new_peer->connection_closed_error)
3973  updated_peer_record.last_error = *new_peer->connection_closed_error;
3974  else
3975  updated_peer_record.last_error = *connect_failed_exception;
3976  _potential_peer_db.update_entry(updated_peer_record);
3977  }
3978 
3979  if (new_peer->performing_firewall_check())
3980  {
3981  // we were connecting to test whether the node is firewalled, and we now know the result.
3982  // send a message back to the requester
3983  peer_connection_ptr requesting_peer = get_peer_by_node_id(new_peer->firewall_check_state->requesting_peer);
3984  if (requesting_peer)
3985  {
3987  reply.endpoint_checked = new_peer->firewall_check_state->endpoint_to_test;
3988  reply.node_id = new_peer->firewall_check_state->expected_node_id;
3989  reply.result = connect_failed_exception ?
3992  wlog("firewall check of ${peer_checked} ${success_or_failure}, sending reply to ${requester}",
3993  ("peer_checked", new_peer->get_remote_endpoint())
3994  ("success_or_failure", connect_failed_exception ? "failed" : "succeeded" )
3995  ("requester", requesting_peer->get_remote_endpoint()));
3996 
3997  requesting_peer->send_message(reply);
3998  }
3999  }
4000 
4001  if (connect_failed_exception || new_peer->performing_firewall_check())
4002  {
4003  // if the connection failed or if this connection was just intended to check
4004  // whether the peer is firewalled, we want to disconnect now.
4005  _handshaking_connections.erase(new_peer);
4006  _terminating_connections.erase(new_peer);
4007  assert(_active_connections.find(new_peer) == _active_connections.end());
4008  _active_connections.erase(new_peer);
4009  assert(_closing_connections.find(new_peer) == _closing_connections.end());
4010  _closing_connections.erase(new_peer);
4011 
4014  schedule_peer_for_deletion(new_peer);
4015 
4016  if (connect_failed_exception)
4017  throw *connect_failed_exception;
4018  }
4019  else
4020  {
4021  // connection was successful and we want to stay connected
4022  fc::ip::endpoint local_endpoint = new_peer->get_local_endpoint();
4023  new_peer->inbound_address = local_endpoint.get_address();
4025  new_peer->outbound_port = local_endpoint.port();
4026 
4029  send_hello_message(new_peer);
4030  dlog("Sent \"hello\" to peer ${peer}", ("peer", new_peer->get_remote_endpoint()));
4031  }
4032  }
4033 
4034  // methods implementing node's public interface
4035  void node_impl::set_node_delegate(node_delegate* del, fc::thread* thread_for_delegate_calls)
4036  {
4038  _delegate.reset();
4039  if (del)
4040  _delegate.reset(new statistics_gathering_node_delegate_wrapper(del, thread_for_delegate_calls));
4041  if( _delegate )
4042  _chain_id = del->get_chain_id();
4043  }
4044 
4045  void node_impl::load_configuration( const fc::path& configuration_directory )
4046  {
4048  _node_configuration_directory = configuration_directory;
4050  bool node_configuration_loaded = false;
4051  if( fc::exists(configuration_file_name ) )
4052  {
4053  try
4054  {
4056  ilog( "Loaded configuration from file ${filename}", ("filename", configuration_file_name ) );
4057 
4060 
4061  node_configuration_loaded = true;
4062  }
4063  catch ( fc::parse_error_exception& parse_error )
4064  {
4065  elog( "malformed node configuration file ${filename}: ${error}",
4066  ( "filename", configuration_file_name )("error", parse_error.to_detail_string() ) );
4067  }
4068  catch ( fc::exception& except )
4069  {
4070  elog( "unexpected exception while reading configuration file ${filename}: ${error}",
4071  ( "filename", configuration_file_name )("error", except.to_detail_string() ) );
4072  }
4073  }
4074 
4075  if( !node_configuration_loaded )
4076  {
4078 
4079 #ifdef GRAPHENE_TEST_NETWORK
4080  uint32_t port = GRAPHENE_NET_TEST_P2P_PORT;
4081 #else
4082  uint32_t port = GRAPHENE_NET_DEFAULT_P2P_PORT;
4083 #endif
4087 
4088  ilog( "generating new private key for this node" );
4090  }
4091 
4093 
4094  fc::path potential_peer_database_file_name(_node_configuration_directory / POTENTIAL_PEER_DATABASE_FILENAME);
4095  try
4096  {
4097  _potential_peer_db.open(potential_peer_database_file_name);
4098 
4099  // push back the time on all peers loaded from the database so we will be able to retry them immediately
4101  {
4102  potential_peer_record updated_peer_record = *itr;
4103  updated_peer_record.last_connection_attempt_time = std::min<fc::time_point_sec>(updated_peer_record.last_connection_attempt_time,
4105  _potential_peer_db.update_entry(updated_peer_record);
4106  }
4107 
4109  }
4110  catch (fc::exception& except)
4111  {
4112  elog("unable to open peer database ${filename}: ${error}",
4113  ("filename", potential_peer_database_file_name)("error", except.to_detail_string()));
4114  throw;
4115  }
4116  }
4117 
4119  {
4122  {
4123  wlog("accept_incoming_connections is false, p2p network will not accept any incoming connections");
4124  return;
4125  }
4126 
4128 
4130  if( listen_endpoint.port() != 0 )
4131  {
4132  // if the user specified a port, we only want to bind to it if it's not already
4133  // being used by another application. During normal operation, we set the
4134  // SO_REUSEADDR/SO_REUSEPORT flags so that we can bind outbound sockets to the
4135  // same local endpoint as we're listening on here. On some platforms, setting
4136  // those flags will prevent us from detecting that other applications are
4137  // listening on that port. We'd like to detect that, so we'll set up a temporary
4138  // tcp server without that flag to see if we can listen on that port.
4139  bool first = true;
4140  for( ;; )
4141  {
4142  bool listen_failed = false;
4143 
4144  try
4145  {
4146  fc::tcp_server temporary_server;
4147  if( listen_endpoint.get_address() != fc::ip::address() )
4148  temporary_server.listen( listen_endpoint );
4149  else
4150  temporary_server.listen( listen_endpoint.port() );
4151  break;
4152  }
4153  catch ( const fc::exception&)
4154  {
4155  listen_failed = true;
4156  }
4157 
4158  if (listen_failed)
4159  {
4161  {
4162  std::ostringstream error_message_stream;
4163  if( first )
4164  {
4165  error_message_stream << "Unable to listen for connections on port " << listen_endpoint.port()
4166  << ", retrying in a few seconds\n";
4167  error_message_stream << "You can wait for it to become available, or restart this program using\n";
4168  error_message_stream << "the --p2p-endpoint option to specify another port\n";
4169  first = false;
4170  }
4171  else
4172  {
4173  error_message_stream << "\nStill waiting for port " << listen_endpoint.port() << " to become available\n";
4174  }
4175  std::string error_message = error_message_stream.str();
4176  wlog(error_message);
4177  std::cout << "\033[31m" << error_message;
4178  _delegate->error_encountered( error_message, fc::oexception() );
4179  fc::usleep( fc::seconds(5 ) );
4180  }
4181  else // don't wait, just find a random port
4182  {
4183  wlog( "unable to bind on the requested endpoint ${endpoint}, which probably means that endpoint is already in use",
4184  ( "endpoint", listen_endpoint ) );
4185  listen_endpoint.set_port( 0 );
4186  }
4187  } // if (listen_failed)
4188  } // for(;;)
4189  } // if (listen_endpoint.port() != 0)
4190  else // port is 0
4191  {
4192  // if they requested a random port, we'll just assume it's available
4193  // (it may not be due to ip address, but we'll detect that in the next step)
4194  }
4195 
4197  try
4198  {
4199  if( listen_endpoint.get_address() != fc::ip::address() )
4200  _tcp_server.listen( listen_endpoint );
4201  else
4202  _tcp_server.listen( listen_endpoint.port() );
4204  ilog( "listening for connections on endpoint ${endpoint} (our first choice)",
4205  ( "endpoint", _actual_listening_endpoint ) );
4206  }
4207  catch ( fc::exception& e )
4208  {
4209  FC_RETHROW_EXCEPTION( e, error, "unable to listen on ${endpoint}", ("endpoint",listen_endpoint ) );
4210  }
4211  }
4212 
4214  {
4217 
4218  assert(!_accept_loop_complete.valid() &&
4229  _accept_loop_complete = fc::async( [=](){ accept_loop(); }, "accept_loop");
4230  _p2p_network_connect_loop_done = fc::async( [=]() { p2p_network_connect_loop(); }, "p2p_network_connect_loop" );
4231  _fetch_sync_items_loop_done = fc::async( [=]() { fetch_sync_items_loop(); }, "fetch_sync_items_loop" );
4232  _fetch_item_loop_done = fc::async( [=]() { fetch_items_loop(); }, "fetch_items_loop" );
4233  _advertise_inventory_loop_done = fc::async( [=]() { advertise_inventory_loop(); }, "advertise_inventory_loop" );
4234  _terminate_inactive_connections_loop_done = fc::async( [=]() { terminate_inactive_connections_loop(); }, "terminate_inactive_connections_loop" );
4235  _fetch_updated_peer_lists_loop_done = fc::async([=](){ fetch_updated_peer_lists_loop(); }, "fetch_updated_peer_lists_loop");
4236  _bandwidth_monitor_loop_done = fc::async([=](){ bandwidth_monitor_loop(); }, "bandwidth_monitor_loop");
4237  _dump_node_status_task_done = fc::async([=](){ dump_node_status_task(); }, "dump_node_status_task");
4239  }
4240 
4242  {
4244  // if we're connecting to them, we believe they're not firewalled
4246 
4247  // if we've recently connected to this peer, reset the last_connection_attempt_time to allow
4248  // us to immediately retry this peer
4249  updated_peer_record.last_connection_attempt_time = std::min<fc::time_point_sec>(updated_peer_record.last_connection_attempt_time,
4251  _add_once_node_list.push_back(updated_peer_record);
4252  _potential_peer_db.update_entry(updated_peer_record);
4254  }
4255 
4256  void node_impl::add_seed_node(const std::string& endpoint_string)
4257  {
4259  _seed_nodes.insert( endpoint_string );
4260  resolve_seed_node_and_add( endpoint_string );
4261  }
4262 
4263  void node_impl::resolve_seed_node_and_add(const std::string& endpoint_string)
4264  {
4266  std::vector<fc::ip::endpoint> endpoints;
4267  ilog("Resolving seed node ${endpoint}", ("endpoint", endpoint_string));
4268  try
4269  {
4270  endpoints = graphene::net::node::resolve_string_to_ip_endpoints(endpoint_string);
4271  }
4272  catch(...)
4273  {
4274  wlog( "Unable to resolve endpoint during attempt to add seed node ${ep}", ("ep", endpoint_string) );
4275  }
4276  for (const fc::ip::endpoint& endpoint : endpoints)
4277  {
4278  ilog("Adding seed node ${endpoint}", ("endpoint", endpoint));
4279  add_node(endpoint);
4280  }
4281  }
4282 
4284  {
4285  new_peer->get_socket().open();
4286  new_peer->get_socket().set_reuse_address();
4287  new_peer->connection_initiation_time = fc::time_point::now();
4288  _handshaking_connections.insert(new_peer);
4289  _rate_limiter.add_tcp_socket(&new_peer->get_socket());
4290 
4292  return;
4293 
4294  std::weak_ptr<peer_connection> new_weak_peer(new_peer);
4295  new_peer->accept_or_connect_task_done = fc::async([this, new_weak_peer](){
4296  peer_connection_ptr new_peer(new_weak_peer.lock());
4297  assert(new_peer);
4298  if (!new_peer)
4299  return;
4300  connect_to_task(new_peer, *new_peer->get_remote_endpoint());
4301  }, "connect_to_task");
4302  }
4303 
4305  {
4307  if (is_connection_to_endpoint_in_progress(remote_endpoint))
4308  FC_THROW_EXCEPTION(already_connected_to_requested_peer, "already connected to requested endpoint ${endpoint}",
4309  ("endpoint", remote_endpoint));
4310 
4311  dlog("node_impl::connect_to_endpoint(${endpoint})", ("endpoint", remote_endpoint));
4313  new_peer->set_remote_endpoint(remote_endpoint);
4314  initiate_connect_to(new_peer);
4315  }
4316 
4318  {
4320  for( const peer_connection_ptr& active_peer : _active_connections )
4321  {
4322  fc::optional<fc::ip::endpoint> endpoint_for_this_peer( active_peer->get_remote_endpoint() );
4323  if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint )
4324  return active_peer;
4325  }
4326  for( const peer_connection_ptr& handshaking_peer : _handshaking_connections )
4327  {
4328  fc::optional<fc::ip::endpoint> endpoint_for_this_peer( handshaking_peer->get_remote_endpoint() );
4329  if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint )
4330  return handshaking_peer;
4331  }
4332  return peer_connection_ptr();
4333  }
4334 
4336  {
4338  return get_connection_to_endpoint( remote_endpoint ) != peer_connection_ptr();
4339  }
4340 
4342  {
4344  _active_connections.insert(peer);
4345  _handshaking_connections.erase(peer);
4346  _closing_connections.erase(peer);
4347  _terminating_connections.erase(peer);
4348  }
4349 
4351  {
4353  _active_connections.erase(peer);
4354  _handshaking_connections.erase(peer);
4355  _closing_connections.insert(peer);
4356  _terminating_connections.erase(peer);
4357  }
4358 
4360  {
4362  _active_connections.erase(peer);
4363  _handshaking_connections.erase(peer);
4364  _closing_connections.erase(peer);
4365  _terminating_connections.insert(peer);
4366  }
4367 
4369  {
4371  ilog( "----------------- PEER STATUS UPDATE --------------------" );
4372  ilog( " number of peers: ${active} active, ${handshaking}, ${closing} closing. attempting to maintain ${desired} - ${maximum} peers",
4373  ( "active", _active_connections.size() )("handshaking", _handshaking_connections.size() )("closing",_closing_connections.size() )
4374  ( "desired", _desired_number_of_connections )("maximum", _maximum_number_of_connections ) );
4375  for( const peer_connection_ptr& peer : _active_connections )
4376  {
4377  ilog( " active peer ${endpoint} peer_is_in_sync_with_us:${in_sync_with_us} we_are_in_sync_with_peer:${in_sync_with_them}",
4378  ( "endpoint", peer->get_remote_endpoint() )
4379  ( "in_sync_with_us", !peer->peer_needs_sync_items_from_us )("in_sync_with_them", !peer->we_need_sync_items_from_peer ) );
4380  if( peer->we_need_sync_items_from_peer )
4381  ilog( " above peer has ${count} sync items we might need", ("count", peer->ids_of_items_to_get.size() ) );
4382  if (peer->inhibit_fetching_sync_blocks)
4383  ilog( " we are not fetching sync blocks from the above peer (inhibit_fetching_sync_blocks == true)" );
4384 
4385  }
4386  for( const peer_connection_ptr& peer : _handshaking_connections )
4387  {
4388  ilog( " handshaking peer ${endpoint} in state ours(${our_state}) theirs(${their_state})",
4389  ( "endpoint", peer->get_remote_endpoint() )("our_state", peer->our_state )("their_state", peer->their_state ) );
4390  }
4391 
4392  ilog( "--------- MEMORY USAGE ------------" );
4393  ilog( "node._active_sync_requests size: ${size}", ("size", _active_sync_requests.size() ) );
4394  ilog( "node._received_sync_items size: ${size}", ("size", _received_sync_items.size() ) );
4395  ilog( "node._new_received_sync_items size: ${size}", ("size", _new_received_sync_items.size() ) );
4396  ilog( "node._items_to_fetch size: ${size}", ("size", _items_to_fetch.size() ) );
4397  ilog( "node._new_inventory size: ${size}", ("size", _new_inventory.size() ) );
4398  ilog( "node._message_cache size: ${size}", ("size", _message_cache.size() ) );
4399  for( const peer_connection_ptr& peer : _active_connections )
4400  {
4401  ilog( " peer ${endpoint}", ("endpoint", peer->get_remote_endpoint() ) );
4402  ilog( " peer.ids_of_items_to_get size: ${size}", ("size", peer->ids_of_items_to_get.size() ) );
4403  ilog( " peer.inventory_peer_advertised_to_us size: ${size}", ("size", peer->inventory_peer_advertised_to_us.size() ) );
4404  ilog( " peer.inventory_advertised_to_peer size: ${size}", ("size", peer->inventory_advertised_to_peer.size() ) );
4405  ilog( " peer.items_requested_from_peer size: ${size}", ("size", peer->items_requested_from_peer.size() ) );
4406  ilog( " peer.sync_items_requested_from_peer size: ${size}", ("size", peer->sync_items_requested_from_peer.size() ) );
4407  }
4408  ilog( "--------- END MEMORY USAGE ------------" );
4409  }
4410 
4412  const std::string& reason_for_disconnect,
4413  bool caused_by_error /* = false */,
4414  const fc::oexception& error /* = fc::oexception() */ )
4415  {
4417  move_peer_to_closing_list(peer_to_disconnect->shared_from_this());
4418 
4419  if (peer_to_disconnect->they_have_requested_close)
4420  {
4421  // the peer has already told us that it's ready to close the connection, so just close the connection
4422  peer_to_disconnect->close_connection();
4423  }
4424  else
4425  {
4426  // we're the first to try to want to close the connection
4427  fc::optional<fc::ip::endpoint> inbound_endpoint = peer_to_disconnect->get_endpoint_for_connecting();
4428  if (inbound_endpoint)
4429  {
4430  fc::optional<potential_peer_record> updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*inbound_endpoint);
4431  if (updated_peer_record)
4432  {
4433  updated_peer_record->last_seen_time = fc::time_point::now();
4434  if (error)
4435  updated_peer_record->last_error = error;
4436  else
4437  updated_peer_record->last_error = fc::exception(FC_LOG_MESSAGE(info, reason_for_disconnect.c_str()));
4438  _potential_peer_db.update_entry(*updated_peer_record);
4439  }
4440  }
4441  peer_to_disconnect->we_have_requested_close = true;
4442  peer_to_disconnect->connection_closed_time = fc::time_point::now();
4443 
4444  closing_connection_message closing_message( reason_for_disconnect, caused_by_error, error );
4445  peer_to_disconnect->send_message( closing_message );
4446  }
4447 
4448  // notify the user. This will be useful in testing, but we might want to remove it later.
4449  // It makes good sense to notify the user if other nodes think she is behaving badly, but
4450  // if we're just detecting and dissconnecting other badly-behaving nodes, they don't really care.
4451  if (caused_by_error)
4452  {
4453  std::ostringstream error_message;
4454  error_message << "I am disconnecting peer " << fc::variant( peer_to_disconnect->get_remote_endpoint(), GRAPHENE_NET_MAX_NESTED_OBJECTS ).as_string() <<
4455  " for reason: " << reason_for_disconnect;
4456  _delegate->error_encountered(error_message.str(), fc::oexception());
4457  dlog(error_message.str());
4458  }
4459  else
4460  dlog("Disconnecting from ${peer} for ${reason}", ("peer",peer_to_disconnect->get_remote_endpoint()) ("reason",reason_for_disconnect));
4461  }
4462 
4463  void node_impl::listen_on_endpoint( const fc::ip::endpoint& ep, bool wait_if_not_available )
4464  {
4467  _node_configuration.wait_if_endpoint_is_busy = wait_if_not_available;
4469  }
4470 
4472  {
4476  }
4477 
4478  void node_impl::listen_on_port( uint16_t port, bool wait_if_not_available )
4479  {
4482  _node_configuration.wait_if_endpoint_is_busy = wait_if_not_available;
4484  }
4485 
4487  {
4490  }
4491 
4492  std::vector<peer_status> node_impl::get_connected_peers() const
4493  {
4495  std::vector<peer_status> statuses;
4496  for (const peer_connection_ptr& peer : _active_connections)
4497  {
4498  ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections
4499 
4500  peer_status this_peer_status;
4501  this_peer_status.version = 0;
4502  fc::optional<fc::ip::endpoint> endpoint = peer->get_remote_endpoint();
4503  if (endpoint)
4504  this_peer_status.host = *endpoint;
4505  fc::mutable_variant_object peer_details;
4506  peer_details["addr"] = endpoint ? (std::string)*endpoint : std::string();
4507  peer_details["addrlocal"] = (std::string)peer->get_local_endpoint();
4508  peer_details["services"] = "00000001";
4509  peer_details["lastsend"] = peer->get_last_message_sent_time().sec_since_epoch();
4510  peer_details["lastrecv"] = peer->get_last_message_received_time().sec_since_epoch();
4511  peer_details["bytessent"] = peer->get_total_bytes_sent();
4512  peer_details["bytesrecv"] = peer->get_total_bytes_received();
4513  peer_details["conntime"] = peer->get_connection_time();
4514  peer_details["pingtime"] = "";
4515  peer_details["pingwait"] = "";
4516  peer_details["version"] = "";
4517  peer_details["subver"] = peer->user_agent;
4518  peer_details["inbound"] = peer->direction == peer_connection_direction::inbound;
4519  peer_details["firewall_status"] = fc::variant( peer->is_firewalled, 1 );
4520  peer_details["startingheight"] = "";
4521  peer_details["banscore"] = "";
4522  peer_details["syncnode"] = "";
4523 
4524  if (peer->fc_git_revision_sha)
4525  {
4526  std::string revision_string = *peer->fc_git_revision_sha;
4527  if (*peer->fc_git_revision_sha == fc::git_revision_sha)
4528  revision_string += " (same as ours)";
4529  else
4530  revision_string += " (different from ours)";
4531  peer_details["fc_git_revision_sha"] = revision_string;
4532 
4533  }
4534  if (peer->fc_git_revision_unix_timestamp)
4535  {
4536  peer_details["fc_git_revision_unix_timestamp"] = *peer->fc_git_revision_unix_timestamp;
4537  std::string age_string = fc::get_approximate_relative_time_string( *peer->fc_git_revision_unix_timestamp);
4538  if (*peer->fc_git_revision_unix_timestamp == fc::time_point_sec(fc::git_revision_unix_timestamp))
4539  age_string += " (same as ours)";
4540  else if (*peer->fc_git_revision_unix_timestamp > fc::time_point_sec(fc::git_revision_unix_timestamp))
4541  age_string += " (newer than ours)";
4542  else
4543  age_string += " (older than ours)";
4544  peer_details["fc_git_revision_age"] = age_string;
4545  }
4546 
4547  if (peer->platform)
4548  peer_details["platform"] = *peer->platform;
4549 
4550  // provide these for debugging
4551  // warning: these are just approximations, if the peer is "downstream" of us, they may
4552  // have received blocks from other peers that we are unaware of
4553  peer_details["current_head_block"] = fc::variant( peer->last_block_delegate_has_seen, 1 );
4554  peer_details["current_head_block_number"] = _delegate->get_block_number(peer->last_block_delegate_has_seen);
4555  peer_details["current_head_block_time"] = peer->last_block_time_delegate_has_seen;
4556 
4557  this_peer_status.info = peer_details;
4558  statuses.push_back(this_peer_status);
4559  }
4560  return statuses;
4561  }
4562 
4564  {
4566  return (uint32_t)_active_connections.size();
4567  }
4568 
4569  void node_impl::broadcast( const message& item_to_broadcast, const message_propagation_data& propagation_data )
4570  {
4572  fc::uint160_t hash_of_message_contents;
4573  if( item_to_broadcast.msg_type.value() == graphene::net::block_message_type )
4574  {
4575  graphene::net::block_message block_message_to_broadcast = item_to_broadcast.as<graphene::net::block_message>();
4576  hash_of_message_contents = block_message_to_broadcast.block_id; // for debugging
4577  _most_recent_blocks_accepted.push_back( block_message_to_broadcast.block_id );
4578  }
4579  else if( item_to_broadcast.msg_type.value() == graphene::net::trx_message_type )
4580  {
4581  graphene::net::trx_message transaction_message_to_broadcast = item_to_broadcast.as<graphene::net::trx_message>();
4582  hash_of_message_contents = transaction_message_to_broadcast.trx.id(); // for debugging
4583  dlog( "broadcasting trx: ${trx}", ("trx", transaction_message_to_broadcast) );
4584  }
4585  message_hash_type hash_of_item_to_broadcast = item_to_broadcast.id();
4586 
4587  _message_cache.cache_message( item_to_broadcast, hash_of_item_to_broadcast, propagation_data, hash_of_message_contents );
4588  _new_inventory.insert( item_id(item_to_broadcast.msg_type.value(), hash_of_item_to_broadcast ) );
4590  }
4591 
4592  void node_impl::broadcast( const message& item_to_broadcast )
4593  {
4595  // this version is called directly from the client
4597  broadcast( item_to_broadcast, propagation_data );
4598  }
4599 
4600  void node_impl::sync_from(const item_id& current_head_block, const std::vector<uint32_t>& hard_fork_block_numbers)
4601  {
4604  _sync_item_type = current_head_block.item_type;
4605  _most_recent_blocks_accepted.push_back(current_head_block.item_hash);
4606  _hard_fork_block_numbers = hard_fork_block_numbers;
4607  }
4608 
4610  {
4612  return !_active_connections.empty();
4613  }
4614 
4615  std::vector<potential_peer_record> node_impl::get_potential_peers() const
4616  {
4618  std::vector<potential_peer_record> result;
4619  // use explicit iterators here, for some reason the mac compiler can't used ranged-based for loops here
4621  result.push_back(*itr);
4622  return result;
4623  }
4624 
4626  {
4628  if (params.contains("peer_connection_retry_timeout"))
4629  _peer_connection_retry_timeout = params["peer_connection_retry_timeout"].as<uint32_t>(1);
4630  if (params.contains("desired_number_of_connections"))
4631  _desired_number_of_connections = params["desired_number_of_connections"].as<uint32_t>(1);
4632  if (params.contains("maximum_number_of_connections"))
4633  _maximum_number_of_connections = params["maximum_number_of_connections"].as<uint32_t>(1);
4634  if (params.contains("maximum_number_of_blocks_to_handle_at_one_time"))
4635  _maximum_number_of_blocks_to_handle_at_one_time = params["maximum_number_of_blocks_to_handle_at_one_time"].as<uint32_t>(1);
4636  if (params.contains("maximum_number_of_sync_blocks_to_prefetch"))
4637  _maximum_number_of_sync_blocks_to_prefetch = params["maximum_number_of_sync_blocks_to_prefetch"].as<uint32_t>(1);
4638  if (params.contains("maximum_blocks_per_peer_during_syncing"))
4639  _maximum_blocks_per_peer_during_syncing = params["maximum_blocks_per_peer_during_syncing"].as<uint32_t>(1);
4640 
4642 
4645  "I have too many connections open");
4647  }
4648 
4650  {
4653  result["peer_connection_retry_timeout"] = _peer_connection_retry_timeout;
4654  result["desired_number_of_connections"] = _desired_number_of_connections;
4655  result["maximum_number_of_connections"] = _maximum_number_of_connections;
4656  result["maximum_number_of_blocks_to_handle_at_one_time"] = _maximum_number_of_blocks_to_handle_at_one_time;
4657  result["maximum_number_of_sync_blocks_to_prefetch"] = _maximum_number_of_sync_blocks_to_prefetch;
4658  result["maximum_blocks_per_peer_during_syncing"] = _maximum_blocks_per_peer_during_syncing;
4659  return result;
4660  }
4661 
4663  {
4665  return _message_cache.get_message_propagation_data( transaction_id );
4666  }
4667 
4669  {
4671  return _message_cache.get_message_propagation_data( block_id );
4672  }
4673 
4675  {
4677  return _node_id;
4678  }
4679  void node_impl::set_allowed_peers(const std::vector<node_id_t>& allowed_peers)
4680  {
4682 #ifdef ENABLE_P2P_DEBUGGING_API
4683  _allowed_peers.clear();
4684  _allowed_peers.insert(allowed_peers.begin(), allowed_peers.end());
4685  std::list<peer_connection_ptr> peers_to_disconnect;
4686  if (!_allowed_peers.empty())
4687  for (const peer_connection_ptr& peer : _active_connections)
4688  if (_allowed_peers.find(peer->node_id) == _allowed_peers.end())
4689  peers_to_disconnect.push_back(peer);
4690  for (const peer_connection_ptr& peer : peers_to_disconnect)
4691  disconnect_from_peer(peer.get(), "My allowed_peers list has changed, and you're no longer allowed. Bye.");
4692 #endif // ENABLE_P2P_DEBUGGING_API
4693  }
4695  {
4698  }
4699 
4700  void node_impl::set_total_bandwidth_limit( uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second )
4701  {
4703  _rate_limiter.set_upload_limit( upload_bytes_per_second );
4704  _rate_limiter.set_download_limit( download_bytes_per_second );
4705  }
4706 
4708  {
4711  }
4712 
4714  {
4716  return _delegate->get_call_statistics();
4717  }
4718 
4720  {
4723  info["listening_on"] = _actual_listening_endpoint;
4724  info["node_public_key"] = fc::variant( _node_public_key, 1 );
4725  info["node_id"] = fc::variant( _node_id, 1 );
4726  info["firewalled"] = fc::variant( _is_firewalled, 1 );
4727  return info;
4728  }
4730  {
4732  std::vector<uint32_t> network_usage_by_second;
4733  network_usage_by_second.reserve(_average_network_read_speed_seconds.size());
4736  std::back_inserter(network_usage_by_second),
4737  std::plus<uint32_t>());
4738 
4739  std::vector<uint32_t> network_usage_by_minute;
4740  network_usage_by_minute.reserve(_average_network_read_speed_minutes.size());
4743  std::back_inserter(network_usage_by_minute),
4744  std::plus<uint32_t>());
4745 
4746  std::vector<uint32_t> network_usage_by_hour;
4747  network_usage_by_hour.reserve(_average_network_read_speed_hours.size());
4750  std::back_inserter(network_usage_by_hour),
4751  std::plus<uint32_t>());
4752 
4754  result["usage_by_second"] = fc::variant( network_usage_by_second, 2 );
4755  result["usage_by_minute"] = fc::variant( network_usage_by_minute, 2 );
4756  result["usage_by_hour"] = fc::variant( network_usage_by_hour, 2 );
4757  return result;
4758  }
4759 
4760  bool node_impl::is_hard_fork_block(uint32_t block_number) const
4761  {
4762  return std::binary_search(_hard_fork_block_numbers.begin(), _hard_fork_block_numbers.end(), block_number);
4763  }
4764  uint32_t node_impl::get_next_known_hard_fork_block_number(uint32_t block_number) const
4765  {
4766  auto iter = std::upper_bound(_hard_fork_block_numbers.begin(), _hard_fork_block_numbers.end(),
4767  block_number);
4768  return iter != _hard_fork_block_numbers.end() ? *iter : 0;
4769  }
4770 
4771  } // end namespace detail
4772 
4773 
4774 
4776  // implement node functions, they call the matching function in to detail::node_impl in the correct thread //
4777 
4778 #ifdef P2P_IN_DEDICATED_THREAD
4779 # define INVOKE_IN_IMPL(method_name, ...) \
4780  return my->_thread->async([&](){ return my->method_name(__VA_ARGS__); }, "thread invoke for method " BOOST_PP_STRINGIZE(method_name)).wait()
4781 #else
4782 # define INVOKE_IN_IMPL(method_name, ...) \
4783  return my->method_name(__VA_ARGS__)
4784 #endif // P2P_IN_DEDICATED_THREAD
4785 
4786  node::node(const std::string& user_agent) :
4787  my(new detail::node_impl(user_agent))
4788  {
4789  }
4790 
4792  {
4793  }
4794 
4796  {
4797  fc::thread* delegate_thread = &fc::thread::current();
4798  INVOKE_IN_IMPL(set_node_delegate, del, delegate_thread);
4799  }
4800 
4801  void node::load_configuration( const fc::path& configuration_directory )
4802  {
4803  INVOKE_IN_IMPL(load_configuration, configuration_directory);
4804  }
4805 
4807  {
4809  }
4810 
4812  {
4814  }
4815 
4817  {
4818  INVOKE_IN_IMPL(add_node, ep);
4819  }
4820 
4821  void node::connect_to_endpoint( const fc::ip::endpoint& remote_endpoint )
4822  {
4823  INVOKE_IN_IMPL(connect_to_endpoint, remote_endpoint);
4824  }
4825 
4826  void node::listen_on_endpoint(const fc::ip::endpoint& ep , bool wait_if_not_available)
4827  {
4828  INVOKE_IN_IMPL(listen_on_endpoint, ep, wait_if_not_available);
4829  }
4830 
4832  {
4834  }
4835 
4836  void node::listen_on_port( uint16_t port, bool wait_if_not_available )
4837  {
4838  INVOKE_IN_IMPL(listen_on_port, port, wait_if_not_available);
4839  }
4840 
4842  {
4844  }
4845 
4846  std::vector<peer_status> node::get_connected_peers() const
4847  {
4849  }
4850 
4852  {
4854  }
4855 
4856  void node::broadcast( const message& msg )
4857  {
4858  INVOKE_IN_IMPL(broadcast, msg);
4859  }
4860 
4861  void node::sync_from(const item_id& current_head_block, const std::vector<uint32_t>& hard_fork_block_numbers)
4862  {
4863