BitShares-Core  6.1.0
BitShares blockchain implementation and command-line interface software
delayed_node_plugin.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 Cryptonomex, Inc., and contributors.
3  *
4  * The MIT License
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  */
24 
28 #include <graphene/app/api.hpp>
29 
31 #include <fc/rpc/websocket_api.hpp>
32 #include <fc/api.hpp>
33 
34 namespace graphene { namespace delayed_node {
35 namespace bpo = boost::program_options;
36 
37 namespace detail {
39  std::string remote_endpoint;
41  std::shared_ptr<fc::rpc::websocket_api_connection> client_connection;
46 };
47 }
48 
50  plugin(app)
51 {
52  // Nothing else to do
53 }
54 
56 
57 void delayed_node_plugin::plugin_set_program_options(bpo::options_description& cli, bpo::options_description& cfg)
58 {
59  cli.add_options()
60  ("trusted-node", boost::program_options::value<std::string>(),
61  "RPC endpoint of a trusted validating node (required for delayed_node)")
62  ;
63  cfg.add(cli);
64 }
65 
67 {
69  try
70  {
71  con = my->client.connect(my->remote_endpoint);
72  }
73  catch( const fc::exception& e )
74  {
75  wlog("Error while connecting: ${e}", ("e", e.to_detail_string()));
77  return;
78  }
79  my->client_connection = std::make_shared<fc::rpc::websocket_api_connection>(
81  my->database_api = my->client_connection->get_remote_api<graphene::app::database_api>(0);
82  my->database_api->set_block_applied_callback([this]( const fc::variant& block_id )
83  {
84  fc::from_variant( block_id, my->last_received_remote_head, GRAPHENE_MAX_NESTED_OBJECTS );
85  } );
86  my->client_connection_closed = my->client_connection->closed.connect([this] {
88  });
89 }
90 
91 void delayed_node_plugin::plugin_initialize(const boost::program_options::variables_map& options)
92 {
93  FC_ASSERT(options.count("trusted-node") > 0);
94  my = std::make_unique<detail::delayed_node_plugin_impl>();
95  my->remote_endpoint = "ws://" + options.at("trusted-node").as<std::string>();
96 }
97 
99 {
100  auto& db = database();
101  uint32_t synced_blocks = 0;
102  uint32_t pass_count = 0;
103  while( true )
104  {
105  graphene::chain::dynamic_global_property_object remote_dpo = my->database_api->get_dynamic_global_properties();
106  if( remote_dpo.last_irreversible_block_num <= db.head_block_num() )
107  {
108  if( remote_dpo.last_irreversible_block_num < db.head_block_num() )
109  {
110  wlog( "Trusted node seems to be behind delayed node" );
111  }
112  if( synced_blocks > 1 )
113  {
114  ilog( "Delayed node finished syncing ${n} blocks in ${k} passes", ("n", synced_blocks)("k", pass_count) );
115  }
116  break;
117  }
118  pass_count++;
119  while( remote_dpo.last_irreversible_block_num > db.head_block_num() )
120  {
121  fc::optional<graphene::chain::signed_block> block = my->database_api->get_block( db.head_block_num()+1 );
122  // TODO: during sync, decouple requesting blocks from preprocessing + applying them
123  FC_ASSERT(block, "Trusted node claims it has blocks it doesn't actually have.");
124  ilog("Pushing block #${n}", ("n", block->block_num()));
125  db.precompute_parallel( *block, graphene::chain::database::skip_nothing ).wait();
126  db.push_block(*block);
127  synced_blocks++;
128  }
129  }
130 }
131 
133 {
134  while( true )
135  {
136  try
137  {
138  fc::usleep( fc::microseconds( 296645 ) ); // wake up a little over 3Hz
139 
140  if( my->last_received_remote_head == my->last_processed_remote_head )
141  continue;
142 
144  my->last_processed_remote_head = my->last_received_remote_head;
145  }
146  catch( const fc::exception& e )
147  {
148  elog("Error during connection: ${e}", ("e", e.to_detail_string()));
149  }
150  }
151 }
152 
154 {
155  fc::async([this]()
156  {
157  mainloop();
158  });
159 
160  connect();
161 }
162 
164 {
165  my->last_received_remote_head = my->last_processed_remote_head;
166  elog("Connection to trusted node failed; retrying in 5 seconds...");
168 }
169 
170 } }
auto async(Functor &&f, const char *desc FC_TASK_NAME_DEFAULT_ARG, priority prio=priority()) -> fc::future< decltype(f())>
Definition: thread.hpp:227
#define GRAPHENE_MAX_NESTED_OBJECTS
Definition: config.hpp:33
Maintains global state information (committee_member list, current fees)This is an implementation det...
#define GRAPHENE_NET_MAX_NESTED_OBJECTS
Definition: config.hpp:112
Definition: api.cpp:48
#define elog(FORMAT,...)
Definition: logger.hpp:129
void plugin_set_program_options(boost::program_options::options_description &, boost::program_options::options_description &cfg) override
Fill in command line parameters used by the plugin.
Used to generate a useful error report when an exception is thrown.At each level in the stack where t...
Definition: exception.hpp:56
#define wlog(FORMAT,...)
Definition: logger.hpp:123
std::shared_ptr< websocket_connection > websocket_connection_ptr
Definition: websocket.hpp:47
std::shared_ptr< fc::rpc::websocket_api_connection > client_connection
provides stack-based nullable value similar to boost::optional
Definition: optional.hpp:20
delayed_node_plugin(graphene::app::application &app)
void usleep(const microseconds &u)
Definition: thread.cpp:368
uint32_t block_num() const
Definition: block.hpp:34
chain::database & database()
Definition: plugin.hpp:115
microseconds seconds(int64_t s)
Definition: time.hpp:34
#define ilog(FORMAT,...)
Definition: logger.hpp:117
void plugin_initialize(const boost::program_options::variables_map &options) override
Perform early startup routines and register plugin indexes, callbacks, etc.
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
Definition: exception.hpp:345
void plugin_startup() override
Begin normal runtime operations.
The database_api class implements the RPC API for the chain database.
stores null, int64, uint64, double, bool, string, std::vector<variant>, and variant_object&#39;s.
Definition: variant.hpp:198
std::string to_detail_string(log_level ll=log_level::all) const
Definition: exception.cpp:183
void from_variant(const variant &var, flat_set< T, A... > &vo, uint32_t _max_depth)
Definition: flat.hpp:116
static time_point now()
Definition: time.cpp:13
boost::signals2::scoped_connection scoped_connection
Definition: signals.hpp:22
auto schedule(Functor &&f, const fc::time_point &t, const char *desc FC_TASK_NAME_DEFAULT_ARG, priority prio=priority()) -> fc::future< decltype(f())>
Definition: thread.hpp:231