BitShares-Core  6.1.0
BitShares blockchain implementation and command-line interface software
node_impl.hxx
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 Cryptonomex, Inc., and contributors.
3  *
4  * The MIT License
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  */
24 #pragma once
25 
26 #define P2P_IN_DEDICATED_THREAD
27 
28 //#define ENABLE_DEBUG_ULOGS
29 
30 #ifdef DEFAULT_LOGGER
31 # undef DEFAULT_LOGGER
32 #endif
33 #define DEFAULT_LOGGER "p2p"
34 
35 //log these messages even at warn level when operating on the test network
36 #ifdef GRAPHENE_TEST_NETWORK
37 #define testnetlog wlog
38 #else
39 #define testnetlog(...) do {} while (0)
40 #endif
41 
42 #include <memory>
43 #include <boost/accumulators/accumulators.hpp>
44 #include <boost/accumulators/statistics.hpp>
45 #include <boost/accumulators/statistics/rolling_mean.hpp>
46 #include <fc/thread/thread.hpp>
47 #include <fc/thread/mutex.hpp>
49 #include <fc/log/logger.hpp>
54 #include <graphene/net/node.hpp>
57 
58 namespace graphene { namespace net { namespace detail {
59 
60 namespace bmi = boost::multi_index;
61 
62 /*******
63  * A class to wrap std::unordered_set for multithreading
64  */
65 template <class Key, class Hash = std::hash<Key>, class Pred = std::equal_to<Key> >
66 class concurrent_unordered_set : private std::unordered_set<Key, Hash, Pred>
67 {
68 private:
69  mutable fc::mutex mux;
70 
71 public:
73  fc::mutex& get_mutex()const { return mux; }
74 
77  std::pair< typename std::unordered_set<Key, Hash, Pred>::iterator, bool> emplace( Key key)
78  {
80  return std::unordered_set<Key, Hash, Pred>::emplace( key );
81  }
82  std::pair< typename std::unordered_set<Key, Hash, Pred>::iterator, bool> insert (const Key& val)
83  {
85  return std::unordered_set<Key, Hash, Pred>::insert( val );
86  }
90  size_t size() const
91  {
93  return std::unordered_set<Key, Hash, Pred>::size();
94  }
95  bool empty() const noexcept
96  {
98  return std::unordered_set<Key, Hash, Pred>::empty();
99  }
103  void clear() noexcept
104  {
105  fc::scoped_lock<fc::mutex> lock(mux);
106  std::unordered_set<Key, Hash, Pred>::clear();
107  }
108  typename std::unordered_set<Key, Hash, Pred>::iterator erase(
109  typename std::unordered_set<Key, Hash, Pred>::const_iterator itr)
110  {
111  fc::scoped_lock<fc::mutex> lock(mux);
112  return std::unordered_set<Key, Hash, Pred>::erase( itr);
113  }
114  size_t erase( const Key& key)
115  {
116  fc::scoped_lock<fc::mutex> lock(mux);
117  return std::unordered_set<Key, Hash, Pred>::erase( key );
118  }
122  void swap( typename std::unordered_set<Key, Hash, Pred>& other ) noexcept
123  {
124  fc::scoped_lock<fc::mutex> lock(mux);
125  std::unordered_set<Key, Hash, Pred>::swap( other );
126  }
130  typename std::unordered_set<Key, Hash, Pred>::iterator begin() noexcept
131  {
132  fc::scoped_lock<fc::mutex> lock(mux);
133  return std::unordered_set<Key, Hash, Pred>::begin();
134  }
135  typename std::unordered_set<Key, Hash, Pred>::const_iterator begin() const noexcept
136  {
137  fc::scoped_lock<fc::mutex> lock(mux);
138  return std::unordered_set<Key, Hash, Pred>::begin();
139  }
140  typename std::unordered_set<Key, Hash, Pred>::local_iterator begin(size_t n)
141  {
142  fc::scoped_lock<fc::mutex> lock(mux);
143  return std::unordered_set<Key, Hash, Pred>::begin(n);
144  }
145  typename std::unordered_set<Key, Hash, Pred>::const_local_iterator begin(size_t n) const
146  {
147  fc::scoped_lock<fc::mutex> lock(mux);
148  return std::unordered_set<Key, Hash, Pred>::begin(n);
149  }
150  typename std::unordered_set<Key, Hash, Pred>::iterator end() noexcept
151  {
152  fc::scoped_lock<fc::mutex> lock(mux);
153  return std::unordered_set<Key, Hash, Pred>::end();
154  }
155  typename std::unordered_set<Key, Hash, Pred>::const_iterator end() const noexcept
156  {
157  fc::scoped_lock<fc::mutex> lock(mux);
158  return std::unordered_set<Key, Hash, Pred>::end();
159  }
160  typename std::unordered_set<Key, Hash, Pred>::local_iterator end(size_t n)
161  {
162  fc::scoped_lock<fc::mutex> lock(mux);
163  return std::unordered_set<Key, Hash, Pred>::end(n);
164  }
165  typename std::unordered_set<Key, Hash, Pred>::const_local_iterator end(size_t n) const
166  {
167  fc::scoped_lock<fc::mutex> lock(mux);
168  return std::unordered_set<Key, Hash, Pred>::end(n);
169  }
172  typename std::unordered_set<Key, Hash, Pred>::const_iterator find(Key key)
173  {
174  fc::scoped_lock<fc::mutex> lock(mux);
175  return std::unordered_set<Key, Hash, Pred>::find(key);
176  }
177 };
178 
180 {
181 private:
182  static const uint32_t cache_duration_in_blocks = GRAPHENE_NET_MESSAGE_CACHE_DURATION_IN_BLOCKS;
183 
184  struct message_hash_index{};
185  struct message_contents_hash_index{};
186  struct block_clock_index{};
187  struct message_info
188  {
189  message_hash_type message_hash;
190  message message_body;
191  uint32_t block_clock_when_received;
192 
194  message_propagation_data propagation_data;
197  message_hash_type message_contents_hash;
198 
199  message_info( const message_hash_type& message_hash,
200  const message& message_body,
201  uint32_t block_clock_when_received,
202  const message_propagation_data& propagation_data,
203  message_hash_type message_contents_hash ) :
204  message_hash( message_hash ),
205  message_body( message_body ),
206  block_clock_when_received( block_clock_when_received ),
207  propagation_data( propagation_data ),
208  message_contents_hash( message_contents_hash )
209  {}
210  };
211 
212  using message_cache_container = boost::multi_index_container < message_info,
213  bmi::indexed_by<
214  bmi::ordered_unique< bmi::tag<message_hash_index>,
215  bmi::member<message_info, message_hash_type, &message_info::message_hash> >,
216  bmi::ordered_non_unique< bmi::tag<message_contents_hash_index>,
217  bmi::member<message_info, message_hash_type, &message_info::message_contents_hash> >,
218  bmi::ordered_non_unique< bmi::tag<block_clock_index>,
219  bmi::member<message_info, uint32_t, &message_info::block_clock_when_received> > > >;
220 
221  message_cache_container _message_cache;
222 
223  uint32_t block_clock = 0;
224 
225 public:
226  void block_accepted();
227  void cache_message( const message& message_to_cache,
228  const message_hash_type& hash_of_message_to_cache,
229  const message_propagation_data& propagation_data,
230  const message_hash_type& message_content_hash );
231  message get_message( const message_hash_type& hash_of_message_to_lookup ) const;
232  message_propagation_data get_message_propagation_data(
233  const message_hash_type& hash_of_msg_contents_to_lookup ) const;
234  size_t size() const { return _message_cache.size(); }
235 };
236 
240 {
244 
245  prioritized_item_id(const item_id& item, size_t sequence_number) :
246  item(item),
247  sequence_number(sequence_number),
248  timestamp(fc::time_point::now())
249  {}
250  bool operator<(const prioritized_item_id& rhs) const
251  {
253  "block_message_type must be greater than trx_message_type for prioritized_item_ids to sort correctly");
254  if (item.item_type != rhs.item.item_type)
255  return item.item_type > rhs.item.item_type;
256  return rhs.sequence_number > sequence_number;
257  }
258 };
259 
261 {
262  private:
263  std::shared_ptr<node_delegate> _node_delegate;
264  fc::thread *_thread;
265 
266  using call_stats_accumulator = boost::accumulators::accumulator_set< int64_t,
267  boost::accumulators::stats< boost::accumulators::tag::min,
268  boost::accumulators::tag::rolling_mean,
269  boost::accumulators::tag::max,
270  boost::accumulators::tag::sum,
271  boost::accumulators::tag::count> >;
272 #define NODE_DELEGATE_METHOD_NAMES (has_item) \
273  (handle_message) \
274  (handle_block) \
275  (handle_transaction) \
276  (get_block_ids) \
277  (get_item) \
278  (get_chain_id) \
279  (get_blockchain_synopsis) \
280  (sync_status) \
281  (connection_count_changed) \
282  (get_block_number) \
283  (get_block_time) \
284  (get_head_block_id) \
285  (estimate_last_known_fork_from_git_revision_timestamp) \
286  (error_encountered) \
287  (get_current_block_interval_in_seconds)
288 
289 
290 
291 #define DECLARE_ACCUMULATOR(r, data, method_name) \
292  mutable call_stats_accumulator BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _execution_accumulator)); \
293  mutable call_stats_accumulator BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _delay_before_accumulator)); \
294  mutable call_stats_accumulator BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _delay_after_accumulator));
295  BOOST_PP_SEQ_FOR_EACH(DECLARE_ACCUMULATOR, unused, NODE_DELEGATE_METHOD_NAMES)
296 #undef DECLARE_ACCUMULATOR
297 
298  class call_statistics_collector
299  {
300  private:
301  fc::time_point _call_requested_time;
302  fc::time_point _begin_execution_time;
303  fc::time_point _execution_completed_time;
304  const char* _method_name;
305  call_stats_accumulator* _execution_accumulator;
306  call_stats_accumulator* _delay_before_accumulator;
307  call_stats_accumulator* _delay_after_accumulator;
308  public:
310  {
311  std::shared_ptr<call_statistics_collector> _collector;
312  public:
313  explicit actual_execution_measurement_helper(std::shared_ptr<call_statistics_collector> collector) :
314  _collector(collector)
315  {
316  _collector->starting_execution();
317  }
319  {
320  _collector->execution_completed();
321  }
322  };
323  call_statistics_collector(const char* method_name,
324  call_stats_accumulator* execution_accumulator,
325  call_stats_accumulator* delay_before_accumulator,
326  call_stats_accumulator* delay_after_accumulator) :
327  _call_requested_time(fc::time_point::now()),
328  _method_name(method_name),
329  _execution_accumulator(execution_accumulator),
330  _delay_before_accumulator(delay_before_accumulator),
331  _delay_after_accumulator(delay_after_accumulator)
332  {}
333  ~call_statistics_collector()
334  {
336  fc::microseconds actual_execution_time(_execution_completed_time - _begin_execution_time);
337  fc::microseconds delay_before(_begin_execution_time - _call_requested_time);
338  fc::microseconds delay_after(end_time - _execution_completed_time);
339  fc::microseconds total_duration(actual_execution_time + delay_before + delay_after);
340  (*_execution_accumulator)(actual_execution_time.count());
341  (*_delay_before_accumulator)(delay_before.count());
342  (*_delay_after_accumulator)(delay_after.count());
343  if (total_duration > fc::milliseconds(500))
344  {
345  dlog("Call to method node_delegate::${method} took ${total_duration}us, longer than our target maximum of 500ms",
346  ("method", _method_name)
347  ("total_duration", total_duration.count()));
348  dlog("Actual execution took ${execution_duration}us, with a ${delegate_delay}us delay before the delegate thread started "
349  "executing the method, and a ${p2p_delay}us delay after it finished before the p2p thread started processing the response",
350  ("execution_duration", actual_execution_time)
351  ("delegate_delay", delay_before)
352  ("p2p_delay", delay_after));
353  }
354  }
355  void starting_execution()
356  {
357  _begin_execution_time = fc::time_point::now();
358  }
359  void execution_completed()
360  {
361  _execution_completed_time = fc::time_point::now();
362  }
363  };
364  public:
365  statistics_gathering_node_delegate_wrapper(std::shared_ptr<node_delegate> delegate,
366  fc::thread* thread_for_delegate_calls);
367 
368  fc::variant_object get_call_statistics();
369 
370  bool has_item( const graphene::net::item_id& id ) override;
371  void handle_message( const message& ) override;
372  bool handle_block( const graphene::net::block_message& block_message, bool sync_mode,
373  std::vector<message_hash_type>& contained_transaction_msg_ids ) override;
374  void handle_transaction( const graphene::net::trx_message& transaction_message ) override;
375  std::vector<item_hash_t> get_block_ids(const std::vector<item_hash_t>& blockchain_synopsis,
376  uint32_t& remaining_item_count,
377  uint32_t limit = 2000) override;
378  message get_item( const item_id& id ) override;
379  graphene::protocol::chain_id_type get_chain_id() const override;
380  std::vector<item_hash_t> get_blockchain_synopsis(const item_hash_t& reference_point,
381  uint32_t number_of_blocks_after_reference_point) override;
382  void sync_status( uint32_t item_type, uint32_t item_count ) override;
383  void connection_count_changed( uint32_t c ) override;
384  uint32_t get_block_number(const item_hash_t& block_id) override;
385  fc::time_point_sec get_block_time(const item_hash_t& block_id) override;
386  item_hash_t get_head_block_id() const override;
387  uint32_t estimate_last_known_fork_from_git_revision_timestamp(uint32_t unix_timestamp) const override;
388  void error_encountered(const std::string& message, const fc::oexception& error) override;
389  uint8_t get_current_block_interval_in_seconds() const override;
390 };
391 
395 {
398  bool accept_incoming_connections = true;
399  bool connect_to_new_peers = true;
400  bool wait_if_endpoint_is_busy = false;
408 };
409 
410 class node_impl : public peer_connection_delegate, public std::enable_shared_from_this<node_impl>
411 {
412 public:
414  {
415  public:
416  static std::shared_ptr<address_builder> create_default_address_builder();
417  void build( node_impl* impl, address_message& ) const;
418  virtual bool should_advertise(const fc::ip::endpoint& in ) const = 0;
419  virtual ~address_builder() = default;
420  };
421 
422 #ifdef P2P_IN_DEDICATED_THREAD
423  std::shared_ptr<fc::thread> _thread = std::make_shared<fc::thread>("p2p");
424  std::shared_ptr<fc::thread> get_thread() const { return _thread; }
425 #endif // P2P_IN_DEDICATED_THREAD
426  std::unique_ptr<statistics_gathering_node_delegate_wrapper> _delegate;
428 
429 #define NODE_CONFIGURATION_FILENAME "node_config.json"
430 #define POTENTIAL_PEER_DATABASE_FILENAME "peers.json"
433 
438  // Note: updating the type to optional may break 3rd-party client applications.
440 
444  std::list<potential_peer_record> _add_once_node_list;
445 
448  bool _potential_peer_db_updated = false;
451 
455  bool _sync_items_to_fetch_updated = false;
457 
458  typedef std::unordered_map<graphene::net::block_id_type, fc::time_point> active_sync_requests_map;
459 
461  active_sync_requests_map _active_sync_requests;
463  std::list<graphene::net::block_message> _new_received_sync_items;
466  std::list<graphene::net::block_message> _received_sync_items;
468 
470  bool _suspend_fetching_sync_blocks = false;
471 
475  bool _items_to_fetch_updated = false;
477 
478  struct item_id_index{};
479  using items_to_fetch_set_type = boost::multi_index_container< prioritized_item_id,
480  boost::multi_index::indexed_by<
481  boost::multi_index::ordered_unique< boost::multi_index::identity<prioritized_item_id> >,
482  boost::multi_index::hashed_unique<
483  boost::multi_index::tag<item_id_index>,
484  boost::multi_index::member<prioritized_item_id, item_id, &prioritized_item_id::item>,
485  std::hash<item_id>
486  >
487  >
488  >;
490  size_t _items_to_fetch_seq_counter = 0;
496 
504 
507  uint8_t _recent_block_interval_seconds = GRAPHENE_MAX_BLOCK_INTERVAL;
508 
509  std::string _user_agent_string;
525 
527  uint32_t _desired_number_of_connections = GRAPHENE_NET_DEFAULT_DESIRED_CONNECTIONS;
529  uint32_t _maximum_number_of_connections = GRAPHENE_NET_DEFAULT_MAX_CONNECTIONS;
531  uint32_t _peer_connection_retry_timeout = GRAPHENE_NET_DEFAULT_PEER_CONNECTION_RETRY_TIME;
533  uint32_t _peer_inactivity_timeout = GRAPHENE_NET_PEER_HANDSHAKE_INACTIVITY_TIMEOUT;
534 
537 
549 
551  boost::circular_buffer<item_hash_t> _most_recent_blocks_accepted { _maximum_number_of_connections };
552 
553  uint32_t _sync_item_type = 0;
555  uint32_t _total_num_of_unfetched_items = 0;
557  std::vector<uint32_t> _hard_fork_block_numbers;
558 
561 
562  fc::rate_limiting_group _rate_limiter { 0, 0 };
563 
565  uint32_t _last_reported_number_of_conns = 0;
566 
567  std::shared_ptr<address_builder> _address_builder = address_builder::create_default_address_builder();
568 
570 
572  boost::circular_buffer<uint32_t> _avg_net_read_speed_seconds { 60 };
574  boost::circular_buffer<uint32_t> _avg_net_write_speed_seconds { 60 };
576  boost::circular_buffer<uint32_t> _avg_net_read_speed_minutes { 60 };
578  boost::circular_buffer<uint32_t> _avg_net_write_speed_minutes { 60 };
580  boost::circular_buffer<uint32_t> _avg_net_read_speed_hours { 72 };
582  boost::circular_buffer<uint32_t> _avg_net_write_speed_hours { 72 };
584  size_t _avg_net_usage_second_counter = 0;
586  size_t _avg_net_usage_minute_counter = 0;
587 
590 
592 
602 //#define USE_PEERS_TO_DELETE_MUTEX 1
604 #ifdef USE_PEERS_TO_DELETE_MUTEX
605  fc::mutex _peers_to_delete_mutex;
606 #endif
607  std::list<peer_connection_ptr> _peers_to_delete;
610 
611 #ifdef ENABLE_P2P_DEBUGGING_API
612  std::set<node_id_t> _allowed_peers;
613 #endif // ENABLE_P2P_DEBUGGING_API
614 
617  bool _node_is_shutting_down = false;
618 
620  size_t _max_addrs_to_handle_at_once = MAX_ADDRESSES_TO_HANDLE_AT_ONCE;
622  size_t _max_blocks_to_handle_at_once = MAX_BLOCKS_TO_HANDLE_AT_ONCE;
624  size_t _max_sync_blocks_to_prefetch = MAX_SYNC_BLOCKS_TO_PREFETCH;
626  size_t _max_sync_blocks_per_peer = GRAPHENE_NET_MAX_BLOCKS_PER_PEER_DURING_SYNCING;
627 
628  std::list<fc::future<void> > _handle_message_calls_in_progress;
629 
632  boost::container::flat_set<std::string> _seed_nodes;
634  void update_seed_nodes_task();
635  void schedule_next_update_seed_nodes_task();
637 
638  explicit node_impl(const std::string& user_agent);
639  ~node_impl() override;
640 
641  void save_node_configuration();
642 
643  void p2p_network_connect_loop();
644  void trigger_p2p_network_connect_loop();
645 
646  bool have_already_received_sync_item( const item_hash_t& item_hash );
647  void request_sync_item_from_peer( const peer_connection_ptr& peer, const item_hash_t& item_to_request );
648  void request_sync_items_from_peer( const peer_connection_ptr& peer, const std::vector<item_hash_t>& items_to_request );
649  void fetch_sync_items_loop();
650  void trigger_fetch_sync_items_loop();
651 
652  bool is_item_in_any_peers_inventory(const item_id& item) const;
653  void fetch_items_loop();
654  void trigger_fetch_items_loop();
655 
656  void advertise_inventory_loop();
657  void trigger_advertise_inventory_loop();
658 
659  void kill_inactive_conns_loop(node_impl_ptr self);
660 
661  void fetch_updated_peer_lists_loop();
662  void update_bandwidth_data(uint32_t bytes_read_this_second, uint32_t bytes_written_this_second);
663  void bandwidth_monitor_loop();
664  void dump_node_status_task();
665 
666  bool is_accepting_new_connections();
667  bool is_wanting_new_connections();
668  uint32_t get_number_of_connections();
669  peer_connection_ptr get_peer_by_node_id(const node_id_t& id) const;
670 
671  bool merge_address_info_with_potential_peer_database( const std::vector<address_info> addresses );
672  void display_current_connections();
673  uint32_t calculate_unsynced_block_count_from_all_peers();
674  std::vector<item_hash_t> create_blockchain_synopsis_for_peer( const peer_connection* peer );
675  void fetch_next_batch_of_item_ids_from_peer( peer_connection* peer, bool reset_fork_tracking_data_for_peer = false );
676 
677  fc::variant_object generate_hello_user_data();
678  void parse_hello_user_data_for_peer( peer_connection* originating_peer, const fc::variant_object& user_data );
679 
680  void on_message( peer_connection* originating_peer,
681  const message& received_message ) override;
682 
683  void on_hello_message( peer_connection* originating_peer,
684  const hello_message& hello_message_received );
685 
686  void on_connection_accepted_message( peer_connection* originating_peer,
687  const connection_accepted_message& ) const;
688 
689  void on_connection_rejected_message( peer_connection* originating_peer,
690  const connection_rejected_message& connection_rejected_message_received );
691 
692  void on_address_request_message( peer_connection* originating_peer, const address_request_message&);
693 
694  void on_address_message( peer_connection* originating_peer,
695  const address_message& address_message_received );
696 
697  void on_fetch_blockchain_item_ids_message( peer_connection* originating_peer,
698  const fetch_blockchain_item_ids_message& fetch_blockchain_item_ids_message_received );
699 
700  void on_blockchain_item_ids_inventory_message( peer_connection* originating_peer,
701  const blockchain_item_ids_inventory_message& blockchain_item_ids_inventory_message_received );
702 
703  void on_fetch_items_message( peer_connection* originating_peer,
704  const fetch_items_message& fetch_items_message_received );
705 
706  void on_item_not_available_message( peer_connection* originating_peer,
707  const item_not_available_message& item_not_available_message_received );
708 
709  void on_item_ids_inventory_message( peer_connection* originating_peer,
710  const item_ids_inventory_message& item_ids_inventory_message_received );
711 
712  void on_closing_connection_message( peer_connection* originating_peer,
713  const closing_connection_message& closing_connection_message_received );
714 
715  void on_current_time_request_message( peer_connection* originating_peer,
716  const current_time_request_message& current_time_request_message_received );
717 
718  void on_current_time_reply_message( peer_connection* originating_peer,
719  const current_time_reply_message& current_time_reply_message_received );
720 
721  void on_connection_closed(peer_connection* originating_peer) override;
722 
723  void send_sync_block_to_node_delegate(const graphene::net::block_message& block_message_to_send);
724  void process_backlog_of_sync_blocks();
725  void trigger_process_backlog_of_sync_blocks();
726  void process_block_during_syncing(
727  peer_connection* originating_peer,
729  const message_hash_type& message_hash);
730  void process_block_when_in_sync(
731  peer_connection* originating_peer,
733  const message_hash_type& message_hash);
734  void process_block_message(
735  peer_connection* originating_peer,
736  const message& message_to_process,
737  const message_hash_type& message_hash);
738 
739  void process_ordinary_message(
740  peer_connection* originating_peer,
741  const message& message_to_process,
742  const message_hash_type& message_hash);
743 
744  void start_synchronizing();
745  void start_synchronizing_with_peer(const peer_connection_ptr& peer);
746 
748  void new_peer_just_added(const peer_connection_ptr& peer);
749 
750  void close();
751 
752  void accept_connection_task(peer_connection_ptr new_peer);
753  void accept_loop();
754  void send_hello_message(const peer_connection_ptr& peer);
755  void connect_to_task(peer_connection_ptr new_peer, const fc::ip::endpoint& remote_endpoint);
756  bool is_connected_to_endpoint(const fc::ip::endpoint& remote_endpoint) const;
757 
758  void move_peer_to_active_list(const peer_connection_ptr& peer);
759  void move_peer_to_closing_list(const peer_connection_ptr& peer);
760  void move_peer_to_terminating_list(const peer_connection_ptr& peer);
761 
762  /***
763  * Look for an active connection at the given address
764  * @param remote_endpoint the address we are interested in
765  * @returns the connection, or peer_connection_ptr() if not found
766  */
767  peer_connection_ptr get_active_conn_for_endpoint( const fc::ip::endpoint& remote_endpoint ) const;
768  /***
769  * Look for a connection that is either active or currently in the handshaking process
770  * @param remote_endpoint the address we are interested in
771  * @returns the connection, or peer_connection_ptr() if not found
772  */
773  peer_connection_ptr get_connection_for_endpoint( const fc::ip::endpoint& remote_endpoint ) const;
774 
775  void dump_node_status();
776 
777  void delayed_peer_deletion_task();
778  void schedule_peer_for_deletion(const peer_connection_ptr& peer_to_delete);
779 
780  void disconnect_from_peer( peer_connection* originating_peer,
781  const std::string& reason_for_disconnect,
782  bool caused_by_error = false,
783  const fc::oexception& additional_data = fc::oexception() );
784 
785  // methods implementing node's public interface
786  void set_node_delegate(std::shared_ptr<node_delegate> del, fc::thread* thread_for_delegate_calls);
787  void load_configuration( const fc::path& configuration_directory );
788  void listen_to_p2p_network();
789  void connect_to_p2p_network(node_impl_ptr self);
790  void add_node( const fc::ip::endpoint& ep );
791  void set_advertise_algorithm( const std::string& algo,
792  const std::vector<std::string>& advertise_or_exclude_list );
793  void add_seed_node( const std::string& seed_string );
794  void resolve_seed_node_and_add( const std::string& seed_string );
795  void initiate_connect_to(const peer_connection_ptr& peer);
796  void connect_to_endpoint(const fc::ip::endpoint& ep);
797  void set_listen_endpoint(const fc::ip::endpoint& ep , bool wait_if_not_available);
798  void set_inbound_endpoint( const fc::ip::endpoint& ep );
799  void set_accept_incoming_connections(bool accept);
800  void set_connect_to_new_peers( bool connect );
801 
802  fc::ip::endpoint get_actual_listening_endpoint() const;
803  std::vector<peer_status> get_connected_peers() const;
804  uint32_t get_connection_count() const;
805 
806  void broadcast(const message& item_to_broadcast, const message_propagation_data& propagation_data);
807  void broadcast(const message& item_to_broadcast);
808  void sync_from(const item_id& current_head_block, const std::vector<uint32_t>& hard_fork_block_numbers);
809  bool is_connected() const;
810  std::vector<potential_peer_record> get_potential_peers() const;
811  void set_advanced_node_parameters( const fc::variant_object& params );
812 
813  fc::variant_object get_advanced_node_parameters();
814  message_propagation_data get_tx_propagation_data(
815  const graphene::net::transaction_id_type& transaction_id ) const;
816  message_propagation_data get_block_propagation_data( const graphene::net::block_id_type& block_id ) const;
817 
818  node_id_t get_node_id() const;
819  void set_allowed_peers( const std::vector<node_id_t>& allowed_peers );
820  void clear_peer_database();
821  void set_total_bandwidth_limit( uint32_t upload_bytes_per_second,
822  uint32_t download_bytes_per_second );
823  fc::variant_object get_call_statistics() const;
824  graphene::net::message get_message_for_item(const item_id& item) override;
825 
826  fc::variant_object network_get_info() const;
827  fc::variant_object network_get_usage_stats() const;
828 
829  bool is_hard_fork_block(uint32_t block_number) const;
830  uint32_t get_next_known_hard_fork_block_number(uint32_t block_number) const;
831  }; // end class node_impl
832 
834  {
835  void operator()(node_impl*);
836  };
837 
838 }}} // end of namespace graphene::net::detail
839 
841  (listen_endpoint)
842  (inbound_endpoint)
843  (accept_incoming_connections)
844  (connect_to_new_peers)
845  (wait_if_endpoint_is_busy)
846  (private_key) )
prioritized_item_id(const item_id &item, size_t sequence_number)
Definition: node_impl.hxx:245
std::unordered_set< Key, Hash, Pred >::local_iterator end(size_t n)
Definition: node_impl.hxx:160
std::shared_ptr< detail::node_impl > node_impl_ptr
Definition: node.hpp:41
#define GRAPHENE_NET_MESSAGE_CACHE_DURATION_IN_BLOCKS
Definition: config.hpp:74
#define DECLARE_ACCUMULATOR(r, data, method_name)
Definition: node_impl.hxx:291
fc::promise< void >::ptr _retrigger_fetch_sync_items_loop_promise
Definition: node_impl.hxx:454
concurrent_unordered_set< item_id > _new_inventory
List of items we have received but not yet advertised to our peers.
Definition: node_impl.hxx:502
std::list< potential_peer_record > _add_once_node_list
Definition: node_impl.hxx:444
fc::future< void > _fetch_updated_peer_lists_loop_done
Definition: node_impl.hxx:569
#define FC_REFLECT(TYPE, MEMBERS)
Specializes fc::reflector for TYPE.
Definition: reflect.hpp:388
node_configuration _node_configuration
Definition: node_impl.hxx:432
fc::ip::endpoint _actual_listening_endpoint
Definition: node_impl.hxx:439
fc::future< void > _dump_node_status_task_done
Definition: node_impl.hxx:591
std::unordered_set< Key, Hash, Pred >::const_local_iterator end(size_t n) const
Definition: node_impl.hxx:165
An order-perserving dictionary of variant&#39;s.
Definition: api.cpp:48
fc::future< void > _update_seed_nodes_loop_done
Definition: node_impl.hxx:633
microseconds milliseconds(int64_t s)
Definition: time.hpp:35
int64_t count() const
Definition: time.hpp:28
#define NODE_DELEGATE_METHOD_NAMES
Definition: node_impl.hxx:272
fc::future< void > _process_backlog_of_sync_blocks_done
Definition: node_impl.hxx:469
used by node reports status to client or fetch data from client
Definition: node.hpp:56
fc::future< void > _p2p_network_connect_loop_done
Definition: node_impl.hxx:449
active_sync_requests_map _active_sync_requests
List of sync blocks we&#39;ve asked for from peers but have not yet received.
Definition: node_impl.hxx:461
fc::future< void > _advertise_inventory_loop_done
Definition: node_impl.hxx:500
void connect(AsyncSocket &sock, const EndpointType &ep)
wraps boost::asio::socket::async_connect
Definition: asio.hpp:262
#define GRAPHENE_NET_PEER_HANDSHAKE_INACTIVITY_TIMEOUT
Definition: config.hpp:50
#define GRAPHENE_NET_MAX_BLOCKS_PER_PEER_DURING_SYNCING
Definition: config.hpp:87
fc::future< void > _bandwidth_monitor_loop_done
Definition: node_impl.hxx:589
items_to_fetch_set_type _items_to_fetch
List of items we know another peer has and we want.
Definition: node_impl.hxx:492
std::unordered_set< Key, Hash, Pred >::iterator erase(typename std::unordered_set< Key, Hash, Pred >::const_iterator itr)
Definition: node_impl.hxx:108
std::unordered_set< Key, Hash, Pred >::iterator begin() noexcept
Definition: node_impl.hxx:130
constexpr size_t MAX_ADDRESSES_TO_HANDLE_AT_ONCE
Definition: config.hpp:116
constexpr size_t MAX_BLOCKS_TO_HANDLE_AT_ONCE
Definition: config.hpp:118
fc::ripemd160 block_id_type
Definition: types.hpp:304
std::unordered_set< Key, Hash, Pred >::local_iterator begin(size_t n)
Definition: node_impl.hxx:140
std::unordered_set< Key, Hash, Pred >::const_iterator begin() const noexcept
Definition: node_impl.hxx:135
void accept(AcceptorType &acc, SocketType &sock)
wraps boost::asio::async_accept
Definition: asio.hpp:250
provides stack-based nullable value similar to boost::optional
Definition: optional.hpp:20
#define GRAPHENE_NET_DEFAULT_DESIRED_CONNECTIONS
Definition: config.hpp:60
boost::container::flat_set< std::string > _seed_nodes
Definition: node_impl.hxx:632
std::unordered_set< Key, Hash, Pred >::iterator end() noexcept
Definition: node_impl.hxx:150
mutex
Definition: mutex.hpp:91
fc::future< void > _fetch_sync_items_loop_done
Definition: node_impl.hxx:456
std::list< graphene::net::block_message > _new_received_sync_items
List of sync blocks we&#39;ve just received but haven&#39;t yet tried to process.
Definition: node_impl.hxx:463
#define GRAPHENE_NET_DEFAULT_MAX_CONNECTIONS
Definition: config.hpp:61
fc::optional< fc::ip::endpoint > inbound_endpoint
Definition: node_impl.hxx:397
fc::future< void > _kill_inactive_conns_loop_done
Definition: node_impl.hxx:505
std::pair< typename std::unordered_set< Key, Hash, Pred >::iterator, bool > insert(const Key &val)
Definition: node_impl.hxx:82
concurrent_unordered_set< graphene::net::peer_connection_ptr > _terminating_connections
Definition: node_impl.hxx:548
std::pair< typename std::unordered_set< Key, Hash, Pred >::iterator, bool > emplace(Key key)
Definition: node_impl.hxx:77
concurrent_unordered_set< graphene::net::peer_connection_ptr > _closing_connections
Definition: node_impl.hxx:545
fc::future< void > _accept_loop_complete
Definition: node_impl.hxx:536
concurrent_unordered_set< graphene::net::peer_connection_ptr > _handshaking_connections
Definition: node_impl.hxx:540
fc::future< void > _delayed_peer_deletion_task_done
Definition: node_impl.hxx:608
fc::mutex & get_mutex() const
Iterations require a lock. This exposes the mutex. Use with care (i.e. lock_guard) ...
Definition: node_impl.hxx:73
fc::time_point timestamp
the time we last heard about this item in an inventory message
Definition: node_impl.hxx:243
std::vector< uint32_t > _hard_fork_block_numbers
List of all block numbers where there are hard forks.
Definition: node_impl.hxx:557
std::shared_ptr< peer_connection > peer_connection_ptr
fc::promise< void >::ptr _retrigger_fetch_item_loop_promise
Definition: node_impl.hxx:474
std::unordered_set< Key, Hash, Pred >::const_iterator find(Key key)
Definition: node_impl.hxx:172
boost::multi_index_container< prioritized_item_id, boost::multi_index::indexed_by< boost::multi_index::ordered_unique< boost::multi_index::identity< prioritized_item_id > >, boost::multi_index::hashed_unique< boost::multi_index::tag< item_id_index >, boost::multi_index::member< prioritized_item_id, item_id, &prioritized_item_id::item >, std::hash< item_id > > > > items_to_fetch_set_type
Definition: node_impl.hxx:488
#define dlog(FORMAT,...)
Definition: logger.hpp:100
std::shared_ptr< fc::thread > get_thread() const
Definition: node_impl.hxx:424
#define GRAPHENE_NET_DEFAULT_PEER_CONNECTION_RETRY_TIME
Definition: config.hpp:42
fc::future< void > _fetch_item_loop_done
Definition: node_impl.hxx:476
boost::multi_index_container< timestamped_item_id, boost::multi_index::indexed_by< boost::multi_index::hashed_unique< boost::multi_index::member< timestamped_item_id, item_id, &timestamped_item_id::item >, std::hash< item_id > >, boost::multi_index::ordered_non_unique< boost::multi_index::tag< timestamp_index >, boost::multi_index::member< timestamped_item_id, fc::time_point_sec, &timestamped_item_id::timestamp > > > > timestamped_items_set_type
Definition: api.hpp:15
fc::promise< void >::ptr _retrigger_advertise_inventory_loop_promise
Definition: node_impl.hxx:499
bool operator<(const prioritized_item_id &rhs) const
Definition: node_impl.hxx:250
#define GRAPHENE_MAX_BLOCK_INTERVAL
Definition: config.hpp:60
std::unordered_set< Key, Hash, Pred >::const_local_iterator begin(size_t n) const
Definition: node_impl.hxx:145
static time_point now()
Definition: time.cpp:13
std::list< fc::future< void > > _handle_message_calls_in_progress
Definition: node_impl.hxx:628
an elliptic curve private key.
Definition: elliptic.hpp:89
fc::ripemd160 transaction_id_type
Definition: types.hpp:306
void swap(typename std::unordered_set< Key, Hash, Pred > &other) noexcept
Definition: node_impl.hxx:122
std::unordered_set< Key, Hash, Pred >::const_iterator end() const noexcept
Definition: node_impl.hxx:155
wraps boost::filesystem::path to provide platform independent path manipulation.
Definition: filesystem.hpp:28
peer_connection::timestamped_items_set_type _recently_failed_items
List of transactions we&#39;ve recently pushed and had rejected by the delegate.
Definition: node_impl.hxx:494
std::list< graphene::net::block_message > _received_sync_items
Definition: node_impl.hxx:466
fc::promise< void >::ptr _retrigger_connect_loop_promise
Definition: node_impl.hxx:447
std::list< peer_connection_ptr > _peers_to_delete
Definition: node_impl.hxx:607
std::unique_ptr< statistics_gathering_node_delegate_wrapper > _delegate
Definition: node_impl.hxx:426
fc::time_point_sec _bandwidth_monitor_last_update_time
Definition: node_impl.hxx:588
constexpr size_t MAX_SYNC_BLOCKS_TO_PREFETCH
Definition: config.hpp:119
concurrent_unordered_set< graphene::net::peer_connection_ptr > _active_connections
Definition: node_impl.hxx:542
std::unordered_map< graphene::net::block_id_type, fc::time_point > active_sync_requests_map
Definition: node_impl.hxx:458
blockchain_tied_message_cache _message_cache
Cache message we have received and might be required to provide to other peers via inventory requests...
Definition: node_impl.hxx:560