26 #define P2P_IN_DEDICATED_THREAD 31 # undef DEFAULT_LOGGER 33 #define DEFAULT_LOGGER "p2p" 36 #ifdef GRAPHENE_TEST_NETWORK 37 #define testnetlog wlog 39 #define testnetlog(...) do {} while (0) 43 #include <boost/accumulators/accumulators.hpp> 44 #include <boost/accumulators/statistics.hpp> 45 #include <boost/accumulators/statistics/rolling_mean.hpp> 58 namespace graphene {
namespace net {
namespace detail {
65 template <
class Key,
class Hash = std::hash<Key>,
class Pred = std::equal_to<Key> >
77 std::pair< typename std::unordered_set<Key, Hash, Pred>::iterator,
bool>
emplace( Key key)
80 return std::unordered_set<Key, Hash, Pred>::emplace( key );
82 std::pair< typename std::unordered_set<Key, Hash, Pred>::iterator,
bool>
insert (
const Key& val)
85 return std::unordered_set<Key, Hash, Pred>::insert( val );
93 return std::unordered_set<Key, Hash, Pred>::size();
98 return std::unordered_set<Key, Hash, Pred>::empty();
106 std::unordered_set<Key, Hash, Pred>::clear();
108 typename std::unordered_set<Key, Hash, Pred>::iterator
erase(
109 typename std::unordered_set<Key, Hash, Pred>::const_iterator itr)
112 return std::unordered_set<Key, Hash, Pred>::erase( itr);
117 return std::unordered_set<Key, Hash, Pred>::erase( key );
122 void swap(
typename std::unordered_set<Key, Hash, Pred>& other ) noexcept
125 std::unordered_set<Key, Hash, Pred>::swap( other );
130 typename std::unordered_set<Key, Hash, Pred>::iterator
begin() noexcept
133 return std::unordered_set<Key, Hash, Pred>::begin();
135 typename std::unordered_set<Key, Hash, Pred>::const_iterator
begin() const noexcept
138 return std::unordered_set<Key, Hash, Pred>::begin();
140 typename std::unordered_set<Key, Hash, Pred>::local_iterator
begin(
size_t n)
143 return std::unordered_set<Key, Hash, Pred>::begin(n);
145 typename std::unordered_set<Key, Hash, Pred>::const_local_iterator
begin(
size_t n)
const 148 return std::unordered_set<Key, Hash, Pred>::begin(n);
150 typename std::unordered_set<Key, Hash, Pred>::iterator
end() noexcept
153 return std::unordered_set<Key, Hash, Pred>::end();
155 typename std::unordered_set<Key, Hash, Pred>::const_iterator
end() const noexcept
158 return std::unordered_set<Key, Hash, Pred>::end();
160 typename std::unordered_set<Key, Hash, Pred>::local_iterator
end(
size_t n)
163 return std::unordered_set<Key, Hash, Pred>::end(n);
165 typename std::unordered_set<Key, Hash, Pred>::const_local_iterator
end(
size_t n)
const 168 return std::unordered_set<Key, Hash, Pred>::end(n);
172 typename std::unordered_set<Key, Hash, Pred>::const_iterator
find(Key key)
175 return std::unordered_set<Key, Hash, Pred>::find(key);
184 struct message_hash_index{};
185 struct message_contents_hash_index{};
186 struct block_clock_index{};
191 uint32_t block_clock_when_received;
201 uint32_t block_clock_when_received,
204 message_hash( message_hash ),
205 message_body( message_body ),
206 block_clock_when_received( block_clock_when_received ),
207 propagation_data( propagation_data ),
208 message_contents_hash( message_contents_hash )
212 using message_cache_container = boost::multi_index_container < message_info,
214 bmi::ordered_unique< bmi::tag<message_hash_index>,
215 bmi::member<message_info, message_hash_type, &message_info::message_hash> >,
216 bmi::ordered_non_unique< bmi::tag<message_contents_hash_index>,
217 bmi::member<message_info, message_hash_type, &message_info::message_contents_hash> >,
218 bmi::ordered_non_unique< bmi::tag<block_clock_index>,
219 bmi::member<message_info, uint32_t, &message_info::block_clock_when_received> > > >;
221 message_cache_container _message_cache;
223 uint32_t block_clock = 0;
226 void block_accepted();
227 void cache_message(
const message& message_to_cache,
234 size_t size()
const {
return _message_cache.size(); }
247 sequence_number(sequence_number),
248 timestamp(
fc::time_point::now())
253 "block_message_type must be greater than trx_message_type for prioritized_item_ids to sort correctly");
263 std::shared_ptr<node_delegate> _node_delegate;
266 using call_stats_accumulator = boost::accumulators::accumulator_set< int64_t,
267 boost::accumulators::stats< boost::accumulators::tag::min,
268 boost::accumulators::tag::rolling_mean,
269 boost::accumulators::tag::max,
270 boost::accumulators::tag::sum,
271 boost::accumulators::tag::count> >;
272 #define NODE_DELEGATE_METHOD_NAMES (has_item) \ 275 (handle_transaction) \ 279 (get_blockchain_synopsis) \ 281 (connection_count_changed) \ 284 (get_head_block_id) \ 285 (estimate_last_known_fork_from_git_revision_timestamp) \ 286 (error_encountered) \ 287 (get_current_block_interval_in_seconds) 291 #define DECLARE_ACCUMULATOR(r, data, method_name) \ 292 mutable call_stats_accumulator BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _execution_accumulator)); \ 293 mutable call_stats_accumulator BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _delay_before_accumulator)); \ 294 mutable call_stats_accumulator BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _delay_after_accumulator)); 296 #undef DECLARE_ACCUMULATOR 298 class call_statistics_collector
304 const char* _method_name;
305 call_stats_accumulator* _execution_accumulator;
306 call_stats_accumulator* _delay_before_accumulator;
307 call_stats_accumulator* _delay_after_accumulator;
311 std::shared_ptr<call_statistics_collector> _collector;
314 _collector(collector)
316 _collector->starting_execution();
320 _collector->execution_completed();
323 call_statistics_collector(
const char* method_name,
324 call_stats_accumulator* execution_accumulator,
325 call_stats_accumulator* delay_before_accumulator,
326 call_stats_accumulator* delay_after_accumulator) :
328 _method_name(method_name),
329 _execution_accumulator(execution_accumulator),
330 _delay_before_accumulator(delay_before_accumulator),
331 _delay_after_accumulator(delay_after_accumulator)
333 ~call_statistics_collector()
336 fc::microseconds actual_execution_time(_execution_completed_time - _begin_execution_time);
337 fc::microseconds delay_before(_begin_execution_time - _call_requested_time);
339 fc::microseconds total_duration(actual_execution_time + delay_before + delay_after);
340 (*_execution_accumulator)(actual_execution_time.
count());
341 (*_delay_before_accumulator)(delay_before.
count());
342 (*_delay_after_accumulator)(delay_after.
count());
345 dlog(
"Call to method node_delegate::${method} took ${total_duration}us, longer than our target maximum of 500ms",
346 (
"method", _method_name)
347 (
"total_duration", total_duration.
count()));
348 dlog(
"Actual execution took ${execution_duration}us, with a ${delegate_delay}us delay before the delegate thread started " 349 "executing the method, and a ${p2p_delay}us delay after it finished before the p2p thread started processing the response",
350 (
"execution_duration", actual_execution_time)
351 (
"delegate_delay", delay_before)
352 (
"p2p_delay", delay_after));
355 void starting_execution()
359 void execution_completed()
371 void handle_message(
const message& )
override;
373 std::vector<message_hash_type>& contained_transaction_msg_ids )
override;
375 std::vector<item_hash_t> get_block_ids(
const std::vector<item_hash_t>& blockchain_synopsis,
376 uint32_t& remaining_item_count,
377 uint32_t limit = 2000)
override;
380 std::vector<item_hash_t> get_blockchain_synopsis(
const item_hash_t& reference_point,
381 uint32_t number_of_blocks_after_reference_point)
override;
382 void sync_status( uint32_t item_type, uint32_t item_count )
override;
383 void connection_count_changed( uint32_t c )
override;
384 uint32_t get_block_number(
const item_hash_t& block_id)
override;
387 uint32_t estimate_last_known_fork_from_git_revision_timestamp(uint32_t unix_timestamp)
const override;
389 uint8_t get_current_block_interval_in_seconds()
const override;
398 bool accept_incoming_connections =
true;
399 bool connect_to_new_peers =
true;
400 bool wait_if_endpoint_is_busy =
false;
416 static std::shared_ptr<address_builder> create_default_address_builder();
422 #ifdef P2P_IN_DEDICATED_THREAD 423 std::shared_ptr<fc::thread> _thread = std::make_shared<fc::thread>(
"p2p");
424 std::shared_ptr<fc::thread>
get_thread()
const {
return _thread; }
425 #endif // P2P_IN_DEDICATED_THREAD 426 std::unique_ptr<statistics_gathering_node_delegate_wrapper>
_delegate;
429 #define NODE_CONFIGURATION_FILENAME "node_config.json" 430 #define POTENTIAL_PEER_DATABASE_FILENAME "peers.json" 448 bool _potential_peer_db_updated =
false;
455 bool _sync_items_to_fetch_updated =
false;
470 bool _suspend_fetching_sync_blocks =
false;
475 bool _items_to_fetch_updated =
false;
480 boost::multi_index::indexed_by<
481 boost::multi_index::ordered_unique< boost::multi_index::identity<prioritized_item_id> >,
482 boost::multi_index::hashed_unique<
483 boost::multi_index::tag<item_id_index>,
484 boost::multi_index::member<prioritized_item_id, item_id, &prioritized_item_id::item>,
490 size_t _items_to_fetch_seq_counter = 0;
551 boost::circular_buffer<item_hash_t> _most_recent_blocks_accepted { _maximum_number_of_connections };
553 uint32_t _sync_item_type = 0;
555 uint32_t _total_num_of_unfetched_items = 0;
565 uint32_t _last_reported_number_of_conns = 0;
567 std::shared_ptr<address_builder> _address_builder = address_builder::create_default_address_builder();
572 boost::circular_buffer<uint32_t> _avg_net_read_speed_seconds { 60 };
574 boost::circular_buffer<uint32_t> _avg_net_write_speed_seconds { 60 };
576 boost::circular_buffer<uint32_t> _avg_net_read_speed_minutes { 60 };
578 boost::circular_buffer<uint32_t> _avg_net_write_speed_minutes { 60 };
580 boost::circular_buffer<uint32_t> _avg_net_read_speed_hours { 72 };
582 boost::circular_buffer<uint32_t> _avg_net_write_speed_hours { 72 };
584 size_t _avg_net_usage_second_counter = 0;
586 size_t _avg_net_usage_minute_counter = 0;
604 #ifdef USE_PEERS_TO_DELETE_MUTEX 611 #ifdef ENABLE_P2P_DEBUGGING_API 612 std::set<node_id_t> _allowed_peers;
613 #endif // ENABLE_P2P_DEBUGGING_API 617 bool _node_is_shutting_down =
false;
634 void update_seed_nodes_task();
635 void schedule_next_update_seed_nodes_task();
638 explicit node_impl(
const std::string& user_agent);
641 void save_node_configuration();
643 void p2p_network_connect_loop();
644 void trigger_p2p_network_connect_loop();
646 bool have_already_received_sync_item(
const item_hash_t& item_hash );
648 void request_sync_items_from_peer(
const peer_connection_ptr& peer,
const std::vector<item_hash_t>& items_to_request );
649 void fetch_sync_items_loop();
650 void trigger_fetch_sync_items_loop();
652 bool is_item_in_any_peers_inventory(
const item_id& item)
const;
653 void fetch_items_loop();
654 void trigger_fetch_items_loop();
656 void advertise_inventory_loop();
657 void trigger_advertise_inventory_loop();
661 void fetch_updated_peer_lists_loop();
662 void update_bandwidth_data(uint32_t bytes_read_this_second, uint32_t bytes_written_this_second);
663 void bandwidth_monitor_loop();
664 void dump_node_status_task();
666 bool is_accepting_new_connections();
667 bool is_wanting_new_connections();
668 uint32_t get_number_of_connections();
671 bool merge_address_info_with_potential_peer_database(
const std::vector<address_info> addresses );
672 void display_current_connections();
673 uint32_t calculate_unsynced_block_count_from_all_peers();
674 std::vector<item_hash_t> create_blockchain_synopsis_for_peer(
const peer_connection* peer );
675 void fetch_next_batch_of_item_ids_from_peer(
peer_connection* peer,
bool reset_fork_tracking_data_for_peer =
false );
681 const message& received_message )
override;
686 void on_connection_accepted_message(
peer_connection* originating_peer,
689 void on_connection_rejected_message(
peer_connection* originating_peer,
697 void on_fetch_blockchain_item_ids_message(
peer_connection* originating_peer,
700 void on_blockchain_item_ids_inventory_message(
peer_connection* originating_peer,
715 void on_current_time_request_message(
peer_connection* originating_peer,
724 void process_backlog_of_sync_blocks();
725 void trigger_process_backlog_of_sync_blocks();
726 void process_block_during_syncing(
730 void process_block_when_in_sync(
734 void process_block_message(
736 const message& message_to_process,
739 void process_ordinary_message(
741 const message& message_to_process,
744 void start_synchronizing();
756 bool is_connected_to_endpoint(
const fc::ip::endpoint& remote_endpoint)
const;
775 void dump_node_status();
777 void delayed_peer_deletion_task();
781 const std::string& reason_for_disconnect,
782 bool caused_by_error =
false,
786 void set_node_delegate(std::shared_ptr<node_delegate> del,
fc::thread* thread_for_delegate_calls);
787 void load_configuration(
const fc::path& configuration_directory );
788 void listen_to_p2p_network();
791 void set_advertise_algorithm(
const std::string& algo,
792 const std::vector<std::string>& advertise_or_exclude_list );
793 void add_seed_node(
const std::string& seed_string );
794 void resolve_seed_node_and_add(
const std::string& seed_string );
797 void set_listen_endpoint(
const fc::ip::endpoint& ep ,
bool wait_if_not_available);
799 void set_accept_incoming_connections(
bool accept);
800 void set_connect_to_new_peers(
bool connect );
803 std::vector<peer_status> get_connected_peers()
const;
804 uint32_t get_connection_count()
const;
807 void broadcast(
const message& item_to_broadcast);
808 void sync_from(
const item_id& current_head_block,
const std::vector<uint32_t>& hard_fork_block_numbers);
809 bool is_connected()
const;
810 std::vector<potential_peer_record> get_potential_peers()
const;
819 void set_allowed_peers(
const std::vector<node_id_t>& allowed_peers );
820 void clear_peer_database();
821 void set_total_bandwidth_limit( uint32_t upload_bytes_per_second,
822 uint32_t download_bytes_per_second );
829 bool is_hard_fork_block(uint32_t block_number)
const;
830 uint32_t get_next_known_hard_fork_block_number(uint32_t block_number)
const;
843 (accept_incoming_connections)
844 (connect_to_new_peers)
845 (wait_if_endpoint_is_busy)
prioritized_item_id(const item_id &item, size_t sequence_number)
std::unordered_set< Key, Hash, Pred >::local_iterator end(size_t n)
std::shared_ptr< detail::node_impl > node_impl_ptr
#define GRAPHENE_NET_MESSAGE_CACHE_DURATION_IN_BLOCKS
#define DECLARE_ACCUMULATOR(r, data, method_name)
fc::promise< void >::ptr _retrigger_fetch_sync_items_loop_promise
concurrent_unordered_set< item_id > _new_inventory
List of items we have received but not yet advertised to our peers.
std::list< potential_peer_record > _add_once_node_list
fc::future< void > _fetch_updated_peer_lists_loop_done
#define FC_REFLECT(TYPE, MEMBERS)
Specializes fc::reflector for TYPE.
node_configuration _node_configuration
fc::ip::endpoint _actual_listening_endpoint
fc::future< void > _dump_node_status_task_done
std::unordered_set< Key, Hash, Pred >::const_local_iterator end(size_t n) const
An order-perserving dictionary of variant's.
fc::future< void > _update_seed_nodes_loop_done
actual_execution_measurement_helper(std::shared_ptr< call_statistics_collector > collector)
microseconds milliseconds(int64_t s)
#define NODE_DELEGATE_METHOD_NAMES
fc::future< void > _process_backlog_of_sync_blocks_done
used by node reports status to client or fetch data from client
fc::future< void > _p2p_network_connect_loop_done
active_sync_requests_map _active_sync_requests
List of sync blocks we've asked for from peers but have not yet received.
fc::future< void > _advertise_inventory_loop_done
void connect(AsyncSocket &sock, const EndpointType &ep)
wraps boost::asio::socket::async_connect
#define GRAPHENE_NET_PEER_HANDSHAKE_INACTIVITY_TIMEOUT
#define GRAPHENE_NET_MAX_BLOCKS_PER_PEER_DURING_SYNCING
fc::future< void > _bandwidth_monitor_loop_done
items_to_fetch_set_type _items_to_fetch
List of items we know another peer has and we want.
std::unordered_set< Key, Hash, Pred >::iterator erase(typename std::unordered_set< Key, Hash, Pred >::const_iterator itr)
std::unordered_set< Key, Hash, Pred >::iterator begin() noexcept
constexpr size_t MAX_ADDRESSES_TO_HANDLE_AT_ONCE
constexpr size_t MAX_BLOCKS_TO_HANDLE_AT_ONCE
fc::ripemd160 block_id_type
~actual_execution_measurement_helper()
std::unordered_set< Key, Hash, Pred >::local_iterator begin(size_t n)
std::unordered_set< Key, Hash, Pred >::const_iterator begin() const noexcept
void accept(AcceptorType &acc, SocketType &sock)
wraps boost::asio::async_accept
provides stack-based nullable value similar to boost::optional
#define GRAPHENE_NET_DEFAULT_DESIRED_CONNECTIONS
boost::container::flat_set< std::string > _seed_nodes
std::unordered_set< Key, Hash, Pred >::iterator end() noexcept
fc::future< void > _fetch_sync_items_loop_done
std::list< graphene::net::block_message > _new_received_sync_items
List of sync blocks we've just received but haven't yet tried to process.
#define GRAPHENE_NET_DEFAULT_MAX_CONNECTIONS
fc::optional< fc::ip::endpoint > inbound_endpoint
fc::tcp_server _tcp_server
fc::future< void > _kill_inactive_conns_loop_done
std::pair< typename std::unordered_set< Key, Hash, Pred >::iterator, bool > insert(const Key &val)
concurrent_unordered_set< graphene::net::peer_connection_ptr > _terminating_connections
std::pair< typename std::unordered_set< Key, Hash, Pred >::iterator, bool > emplace(Key key)
concurrent_unordered_set< graphene::net::peer_connection_ptr > _closing_connections
fc::future< void > _accept_loop_complete
fc::ecc::private_key private_key
concurrent_unordered_set< graphene::net::peer_connection_ptr > _handshaking_connections
fc::future< void > _delayed_peer_deletion_task_done
std::string _user_agent_string
fc::mutex & get_mutex() const
Iterations require a lock. This exposes the mutex. Use with care (i.e. lock_guard) ...
fc::time_point timestamp
the time we last heard about this item in an inventory message
fc::ip::endpoint listen_endpoint
std::vector< uint32_t > _hard_fork_block_numbers
List of all block numbers where there are hard forks.
std::shared_ptr< peer_connection > peer_connection_ptr
fc::promise< void >::ptr _retrigger_fetch_item_loop_promise
std::unordered_set< Key, Hash, Pred >::const_iterator find(Key key)
boost::multi_index_container< prioritized_item_id, boost::multi_index::indexed_by< boost::multi_index::ordered_unique< boost::multi_index::identity< prioritized_item_id > >, boost::multi_index::hashed_unique< boost::multi_index::tag< item_id_index >, boost::multi_index::member< prioritized_item_id, item_id, &prioritized_item_id::item >, std::hash< item_id > > > > items_to_fetch_set_type
std::shared_ptr< fc::thread > get_thread() const
#define GRAPHENE_NET_DEFAULT_PEER_CONNECTION_RETRY_TIME
fc::future< void > _fetch_item_loop_done
boost::multi_index_container< timestamped_item_id, boost::multi_index::indexed_by< boost::multi_index::hashed_unique< boost::multi_index::member< timestamped_item_id, item_id, ×tamped_item_id::item >, std::hash< item_id > >, boost::multi_index::ordered_non_unique< boost::multi_index::tag< timestamp_index >, boost::multi_index::member< timestamped_item_id, fc::time_point_sec, ×tamped_item_id::timestamp > > > > timestamped_items_set_type
fc::promise< void >::ptr _retrigger_advertise_inventory_loop_promise
fc::path _node_configuration_directory
node_id_t _node_public_key
bool operator<(const prioritized_item_id &rhs) const
#define GRAPHENE_MAX_BLOCK_INTERVAL
std::unordered_set< Key, Hash, Pred >::const_local_iterator begin(size_t n) const
std::list< fc::future< void > > _handle_message_calls_in_progress
an elliptic curve private key.
fc::ripemd160 transaction_id_type
void swap(typename std::unordered_set< Key, Hash, Pred > &other) noexcept
std::unordered_set< Key, Hash, Pred >::const_iterator end() const noexcept
bool empty() const noexcept
wraps boost::filesystem::path to provide platform independent path manipulation.
peer_connection::timestamped_items_set_type _recently_failed_items
List of transactions we've recently pushed and had rejected by the delegate.
std::list< graphene::net::block_message > _received_sync_items
fc::promise< void >::ptr _retrigger_connect_loop_promise
std::list< peer_connection_ptr > _peers_to_delete
std::unique_ptr< statistics_gathering_node_delegate_wrapper > _delegate
fc::time_point_sec _bandwidth_monitor_last_update_time
constexpr size_t MAX_SYNC_BLOCKS_TO_PREFETCH
concurrent_unordered_set< graphene::net::peer_connection_ptr > _active_connections
std::unordered_map< graphene::net::block_id_type, fc::time_point > active_sync_requests_map
size_t erase(const Key &key)
peer_database _potential_peer_db
blockchain_tied_message_cache _message_cache
Cache message we have received and might be required to provide to other peers via inventory requests...