BitShares-Core  5.0.0
BitShares blockchain implementation and command-line interface software
es_objects.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 oxarbitrage, and contributors.
3  *
4  * The MIT License
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  */
24 
26 
27 #include <curl/curl.h>
33 
35 
36 namespace graphene { namespace es_objects {
37 
38 namespace detail
39 {
40 
42 {
43  public:
45  : _self( _plugin )
46  { curl = curl_easy_init(); }
47  virtual ~es_objects_plugin_impl();
48 
49  bool index_database(const vector<object_id_type>& ids, std::string action);
50  bool genesis();
51  void remove_from_database(object_id_type id, std::string index);
52 
54  std::string _es_objects_elasticsearch_url = "http://localhost:9200/";
55  std::string _es_objects_auth = "";
56  uint32_t _es_objects_bulk_replay = 10000;
57  uint32_t _es_objects_bulk_sync = 100;
58  bool _es_objects_proposals = true;
59  bool _es_objects_accounts = true;
60  bool _es_objects_assets = true;
61  bool _es_objects_balances = true;
64  std::string _es_objects_index_prefix = "objects-";
66  CURL *curl; // curl handler
67  vector <std::string> bulk;
68  vector<std::string> prepare;
69 
71 
72  uint32_t block_number;
74 
75  private:
76  template<typename T>
77  void prepareTemplate(T blockchain_object, string index_name);
78 };
79 
81 {
82  ilog("elasticsearch OBJECTS: inserting data from genesis");
83 
85 
88 
90  auto &index_accounts = db.get_index(1, 2);
91  index_accounts.inspect_all_objects([this, &db](const graphene::db::object &o) {
92  auto obj = db.find_object(o.id);
93  auto a = static_cast<const account_object *>(obj);
94  prepareTemplate<account_object>(*a, "account");
95  });
96  }
97  if (_es_objects_assets) {
98  auto &index_assets = db.get_index(1, 3);
99  index_assets.inspect_all_objects([this, &db](const graphene::db::object &o) {
100  auto obj = db.find_object(o.id);
101  auto a = static_cast<const asset_object *>(obj);
102  prepareTemplate<asset_object>(*a, "asset");
103  });
104  }
105  if (_es_objects_balances) {
106  auto &index_balances = db.get_index(2, 5);
107  index_balances.inspect_all_objects([this, &db](const graphene::db::object &o) {
108  auto obj = db.find_object(o.id);
109  auto b = static_cast<const account_balance_object *>(obj);
110  prepareTemplate<account_balance_object>(*b, "balance");
111  });
112  }
113 
115  es.curl = curl;
116  es.bulk_lines = bulk;
118  es.auth = _es_objects_auth;
119  if (!graphene::utilities::SendBulk(std::move(es)))
120  FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error inserting genesis data.");
121  else
122  bulk.clear();
123 
124  return true;
125 }
126 
127 bool es_objects_plugin_impl::index_database(const vector<object_id_type>& ids, std::string action)
128 {
130 
133 
135 
136  // check if we are in replay or in sync and change number of bulk documents accordingly
137  uint32_t limit_documents = 0;
139  limit_documents = _es_objects_bulk_sync;
140  else
141  limit_documents = _es_objects_bulk_replay;
142 
143 
144  for (auto const &value: ids) {
145  if (value.is<proposal_object>() && _es_objects_proposals) {
146  auto obj = db.find_object(value);
147  auto p = static_cast<const proposal_object *>(obj);
148  if (p != nullptr) {
149  if (action == "delete")
150  remove_from_database(p->id, "proposal");
151  else
152  prepareTemplate<proposal_object>(*p, "proposal");
153  }
154  } else if (value.is<account_object>() && _es_objects_accounts) {
155  auto obj = db.find_object(value);
156  auto a = static_cast<const account_object *>(obj);
157  if (a != nullptr) {
158  if (action == "delete")
159  remove_from_database(a->id, "account");
160  else
161  prepareTemplate<account_object>(*a, "account");
162  }
163  } else if (value.is<asset_object>() && _es_objects_assets) {
164  auto obj = db.find_object(value);
165  auto a = static_cast<const asset_object *>(obj);
166  if (a != nullptr) {
167  if (action == "delete")
168  remove_from_database(a->id, "asset");
169  else
170  prepareTemplate<asset_object>(*a, "asset");
171  }
172  } else if (value.is<account_balance_object>() && _es_objects_balances) {
173  auto obj = db.find_object(value);
174  auto b = static_cast<const account_balance_object *>(obj);
175  if (b != nullptr) {
176  if (action == "delete")
177  remove_from_database(b->id, "balance");
178  else
179  prepareTemplate<account_balance_object>(*b, "balance");
180  }
181  } else if (value.is<limit_order_object>() && _es_objects_limit_orders) {
182  auto obj = db.find_object(value);
183  auto l = static_cast<const limit_order_object *>(obj);
184  if (l != nullptr) {
185  if (action == "delete")
186  remove_from_database(l->id, "limitorder");
187  else
188  prepareTemplate<limit_order_object>(*l, "limitorder");
189  }
190  } else if (value.is<asset_bitasset_data_object>() && _es_objects_asset_bitasset) {
191  auto obj = db.find_object(value);
192  auto ba = static_cast<const asset_bitasset_data_object *>(obj);
193  if (ba != nullptr) {
194  if (action == "delete")
195  remove_from_database(ba->id, "bitasset");
196  else
197  prepareTemplate<asset_bitasset_data_object>(*ba, "bitasset");
198  }
199  }
200  }
201 
202  if (curl && bulk.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech
203 
205  es.curl = curl;
206  es.bulk_lines = bulk;
208  es.auth = _es_objects_auth;
209 
210  if (!graphene::utilities::SendBulk(std::move(es)))
211  return false;
212  else
213  bulk.clear();
214  }
215  }
216 
217  return true;
218 }
219 
220 void es_objects_plugin_impl::remove_from_database( object_id_type id, std::string index)
221 {
223  {
224  fc::mutable_variant_object delete_line;
225  delete_line["_id"] = string(id);
226  delete_line["_index"] = _es_objects_index_prefix + index;
227  delete_line["_type"] = "data";
228  fc::mutable_variant_object final_delete_line;
229  final_delete_line["delete"] = delete_line;
230  prepare.push_back(fc::json::to_string(final_delete_line));
231  std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk));
232  prepare.clear();
233  }
234 }
235 
236 template<typename T>
237 void es_objects_plugin_impl::prepareTemplate(T blockchain_object, string index_name)
238 {
239  fc::mutable_variant_object bulk_header;
240  bulk_header["_index"] = _es_objects_index_prefix + index_name;
241  bulk_header["_type"] = "data";
243  {
244  bulk_header["_id"] = string(blockchain_object.id);
245  }
246 
247  adaptor_struct adaptor;
248  fc::variant blockchain_object_variant;
249  fc::to_variant( blockchain_object, blockchain_object_variant, GRAPHENE_NET_MAX_NESTED_OBJECTS );
250  fc::mutable_variant_object o = adaptor.adapt(blockchain_object_variant.get_object());
251 
252  o["object_id"] = string(blockchain_object.id);
253  o["block_time"] = block_time;
254  o["block_number"] = block_number;
255 
257 
258  prepare = graphene::utilities::createBulk(bulk_header, std::move(data));
259  std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk));
260  prepare.clear();
261 }
262 
264 {
265  if (curl) {
266  curl_easy_cleanup(curl);
267  curl = nullptr;
268  }
269  return;
270 }
271 
272 } // end namespace detail
273 
275  my( new detail::es_objects_plugin_impl(*this) )
276 {
277 }
278 
280 {
281 }
282 
284 {
285  return "es_objects";
286 }
288 {
289  return "Stores blockchain objects in ES database. Experimental.";
290 }
291 
293  boost::program_options::options_description& cli,
294  boost::program_options::options_description& cfg
295  )
296 {
297  cli.add_options()
298  ("es-objects-elasticsearch-url", boost::program_options::value<std::string>(),
299  "Elasticsearch node url(http://localhost:9200/)")
300  ("es-objects-auth", boost::program_options::value<std::string>(), "Basic auth username:password('')")
301  ("es-objects-bulk-replay", boost::program_options::value<uint32_t>(),
302  "Number of bulk documents to index on replay(10000)")
303  ("es-objects-bulk-sync", boost::program_options::value<uint32_t>(),
304  "Number of bulk documents to index on a synchronized chain(100)")
305  ("es-objects-proposals", boost::program_options::value<bool>(), "Store proposal objects(true)")
306  ("es-objects-accounts", boost::program_options::value<bool>(), "Store account objects(true)")
307  ("es-objects-assets", boost::program_options::value<bool>(), "Store asset objects(true)")
308  ("es-objects-balances", boost::program_options::value<bool>(), "Store balances objects(true)")
309  ("es-objects-limit-orders", boost::program_options::value<bool>(), "Store limit order objects(false)")
310  ("es-objects-asset-bitasset", boost::program_options::value<bool>(), "Store feed data(true)")
311  ("es-objects-index-prefix", boost::program_options::value<std::string>(),
312  "Add a prefix to the index(objects-)")
313  ("es-objects-keep-only-current", boost::program_options::value<bool>(),
314  "Keep only current state of the objects(true)")
315  ("es-objects-start-es-after-block", boost::program_options::value<uint32_t>(),
316  "Start doing ES job after block(0)")
317  ;
318  cfg.add(cli);
319 }
320 
321 void es_objects_plugin::plugin_initialize(const boost::program_options::variables_map& options)
322 {
323  if (options.count("es-objects-elasticsearch-url")) {
324  my->_es_objects_elasticsearch_url = options["es-objects-elasticsearch-url"].as<std::string>();
325  }
326  if (options.count("es-objects-auth")) {
327  my->_es_objects_auth = options["es-objects-auth"].as<std::string>();
328  }
329  if (options.count("es-objects-bulk-replay")) {
330  my->_es_objects_bulk_replay = options["es-objects-bulk-replay"].as<uint32_t>();
331  }
332  if (options.count("es-objects-bulk-sync")) {
333  my->_es_objects_bulk_sync = options["es-objects-bulk-sync"].as<uint32_t>();
334  }
335  if (options.count("es-objects-proposals")) {
336  my->_es_objects_proposals = options["es-objects-proposals"].as<bool>();
337  }
338  if (options.count("es-objects-accounts")) {
339  my->_es_objects_accounts = options["es-objects-accounts"].as<bool>();
340  }
341  if (options.count("es-objects-assets")) {
342  my->_es_objects_assets = options["es-objects-assets"].as<bool>();
343  }
344  if (options.count("es-objects-balances")) {
345  my->_es_objects_balances = options["es-objects-balances"].as<bool>();
346  }
347  if (options.count("es-objects-limit-orders")) {
348  my->_es_objects_limit_orders = options["es-objects-limit-orders"].as<bool>();
349  }
350  if (options.count("es-objects-asset-bitasset")) {
351  my->_es_objects_asset_bitasset = options["es-objects-asset-bitasset"].as<bool>();
352  }
353  if (options.count("es-objects-index-prefix")) {
354  my->_es_objects_index_prefix = options["es-objects-index-prefix"].as<std::string>();
355  }
356  if (options.count("es-objects-keep-only-current")) {
357  my->_es_objects_keep_only_current = options["es-objects-keep-only-current"].as<bool>();
358  }
359  if (options.count("es-objects-start-es-after-block")) {
360  my->_es_objects_start_es_after_block = options["es-objects-start-es-after-block"].as<uint32_t>();
361  }
362 
363  database().applied_block.connect([this](const signed_block &b) {
364  if(b.block_num() == 1 && my->_es_objects_start_es_after_block == 0) {
365  if (!my->genesis())
366  FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error populating genesis data.");
367  }
368  });
369  database().new_objects.connect([this]( const vector<object_id_type>& ids,
370  const flat_set<account_id_type>& impacted_accounts ) {
371  if(!my->index_database(ids, "create"))
372  {
373  FC_THROW_EXCEPTION(graphene::chain::plugin_exception,
374  "Error creating object from ES database, we are going to keep trying.");
375  }
376  });
377  database().changed_objects.connect([this]( const vector<object_id_type>& ids,
378  const flat_set<account_id_type>& impacted_accounts ) {
379  if(!my->index_database(ids, "update"))
380  {
381  FC_THROW_EXCEPTION(graphene::chain::plugin_exception,
382  "Error updating object from ES database, we are going to keep trying.");
383  }
384  });
385  database().removed_objects.connect([this](const vector<object_id_type>& ids,
386  const vector<const object*>& objs, const flat_set<account_id_type>& impacted_accounts) {
387  if(!my->index_database(ids, "delete"))
388  {
389  FC_THROW_EXCEPTION(graphene::chain::plugin_exception,
390  "Error deleting object from ES database, we are going to keep trying.");
391  }
392  });
393 }
394 
396 {
398  es.curl = my->curl;
399  es.elasticsearch_url = my->_es_objects_elasticsearch_url;
400  es.auth = my->_es_objects_auth;
401  es.auth = my->_es_objects_index_prefix;
402 
404  FC_THROW_EXCEPTION(fc::exception, "ES database is not up in url ${url}", ("url", my->_es_objects_elasticsearch_url));
405  ilog("elasticsearch OBJECTS: plugin_startup() begin");
406 }
407 
408 } }
static string to_string(const variant &v, output_formatting format=stringify_large_ints_and_doubles, uint32_t max_depth=DEFAULT_MAX_RECURSION_DEPTH)
Definition: json.cpp:638
Tracks the balance of a single account/asset pairThis object is indexed on owner and asset_type so th...
virtual void plugin_initialize(const boost::program_options::variables_map &options) override
Perform early startup routines and register plugin indexes, callbacks, etc.
Definition: es_objects.cpp:321
fc::signal< void(const vector< object_id_type > &, const vector< const object * > &, const flat_set< account_id_type > &)> removed_objects
Definition: database.hpp:220
contains properties that only apply to bitassets (market issued assets)
const object * find_object(object_id_type id) const
fc::signal< void(const signed_block &)> applied_block
Definition: database.hpp:197
This class represents an account on the object graphAccounts are the primary unit of authority on the...
tracks the blockchain state in an extensible manner
Definition: database.hpp:70
#define GRAPHENE_NET_MAX_NESTED_OBJECTS
Definition: config.hpp:110
void remove_from_database(object_id_type id, std::string index)
Definition: es_objects.cpp:220
virtual void inspect_all_objects(std::function< void(const object &)> inspector) const =0
Definition: api.cpp:56
fc::signal< void(const vector< object_id_type > &, const flat_set< account_id_type > &)> changed_objects
Definition: database.hpp:215
virtual void plugin_set_program_options(boost::program_options::options_description &cli, boost::program_options::options_description &cfg) override
Fill in command line parameters used by the plugin.
Definition: es_objects.cpp:292
Used to generate a useful error report when an exception is thrown.At each level in the stack where t...
Definition: exception.hpp:56
uint32_t head_block_num() const
Definition: db_getter.cpp:69
es_objects_plugin_impl(es_objects_plugin &_plugin)
Definition: es_objects.cpp:44
typename impl::zip< typename impl::make_sequence< length< List >()>::type, List >::type index
Definition: typelist.hpp:225
bool SendBulk(ES &&es)
variant_object & get_object()
Definition: variant.cpp:554
virtual void plugin_startup() override
Begin normal runtime operations.
Definition: es_objects.cpp:395
object_id_type id
Definition: object.hpp:69
void to_variant(const flat_set< T, A... > &var, variant &vo, uint32_t _max_depth)
Definition: flat.hpp:105
chain::database & database()
Definition: plugin.hpp:114
const index & get_index() const
const std::vector< std::string > createBulk(const fc::mutable_variant_object &bulk_header, std::string &&data)
microseconds seconds(int64_t s)
Definition: time.hpp:34
time_point_sec head_block_time() const
Definition: db_getter.cpp:64
#define ilog(FORMAT,...)
Definition: logger.hpp:117
std::unique_ptr< detail::es_objects_plugin_impl > my
Definition: es_objects.hpp:53
stores null, int64, uint64, double, bool, string, std::vector<variant>, and variant_object&#39;s.
Definition: variant.hpp:198
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
Definition: exception.hpp:378
bool checkES(ES &es)
tracks the parameters of an assetAll assets have a globally unique symbol name that controls how they...
uint32_t block_num() const
Definition: block.hpp:34
abstract base class for accessing objects indexed in various ways.
Definition: index.hpp:71
bool index_database(const vector< object_id_type > &ids, std::string action)
Definition: es_objects.cpp:127
static time_point now()
Definition: time.cpp:13
std::string plugin_description() const override
Definition: es_objects.cpp:287
base for all database objects
Definition: object.hpp:62
std::vector< std::string > bulk_lines
An order-perserving dictionary of variant&#39;s.
an offer to sell a amount of a asset at a specified exchange rate by a certain timeThis limit_order_o...
fc::signal< void(const vector< object_id_type > &, const flat_set< account_id_type > &)> new_objects
Definition: database.hpp:209
std::string plugin_name() const override
Definition: es_objects.cpp:283
tracks the approval of a partially approved transaction
fc::mutable_variant_object adapt(const variant_object &obj)
Definition: es_objects.hpp:57