BitShares-Core  7.0.0
BitShares blockchain node software and command-line wallet software
elasticsearch_plugin.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017 Cryptonomex, Inc., and contributors.
3  *
4  * The MIT License
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  */
24 
28 #include <graphene/chain/hardfork.hpp>
29 
30 #include <boost/algorithm/string.hpp>
31 
33 
34 namespace graphene { namespace elasticsearch {
35 
36 namespace detail
37 {
38 
40 {
41  public:
43  : _self( _plugin )
44  { }
45 
46  private:
48 
49  struct plugin_options
50  {
51  std::string elasticsearch_url = "http://localhost:9200/";
52  std::string auth = "";
53  uint32_t bulk_replay = 10000;
54  uint32_t bulk_sync = 100;
55 
56  std::string index_prefix = "bitshares-";
57 
59  uint16_t max_mapping_depth = 20;
60 
61  uint32_t start_es_after_block = 0;
62 
63  bool visitor = false;
64  bool operation_object = true;
65  bool operation_string = false;
66 
67  mode elasticsearch_mode = mode::only_save;
68 
69  void init(const boost::program_options::variables_map& options);
70  };
71 
72  void update_account_histories( const signed_block& b );
73 
75  {
76  return _self.database();
77  }
78 
79  elasticsearch_plugin& _self;
80  plugin_options _options;
81 
83 
84  uint32_t limit_documents = _options.bulk_replay;
85 
86  std::unique_ptr<graphene::utilities::es_client> es;
87 
88  vector <string> bulk_lines; // vector of op lines
89  size_t approximate_bulk_size = 0;
90 
91  bulk_struct bulk_line_struct;
92 
93  std::string index_name;
94  bool is_sync = false;
95  bool is_es_version_7_or_above = true;
96 
97  void add_elasticsearch( const account_id_type& account_id, const optional<operation_history_object>& oho,
98  uint32_t block_number );
99  void send_bulk( uint32_t block_num );
100 
101  void doOperationHistory(const optional <operation_history_object>& oho, operation_history_struct& os) const;
102  void doBlock(uint32_t trx_in_block, const signed_block& b, block_struct& bs) const;
103  void doVisitor(const optional <operation_history_object>& oho, visitor_struct& vs) const;
104  void checkState(const fc::time_point_sec& block_time);
105  void cleanObjects(const account_history_object& ath, const account_id_type& account_id);
106 
107  void init_program_options(const boost::program_options::variables_map& options);
108 };
109 
110 static std::string generateIndexName( const fc::time_point_sec& block_date,
111  const std::string& index_prefix )
112 {
113  auto block_date_string = block_date.to_iso_string();
114  std::vector<std::string> parts;
115  boost::split(parts, block_date_string, boost::is_any_of("-"));
116  std::string index_name = index_prefix + parts[0] + "-" + parts[1];
117  return index_name;
118 }
119 
120 void elasticsearch_plugin_impl::update_account_histories( const signed_block& b )
121 {
122  checkState(b.timestamp);
123  index_name = generateIndexName(b.timestamp, _options.index_prefix);
124 
125  graphene::chain::database& db = database();
126  const vector<optional< operation_history_object > >& hist = db.get_applied_operations();
127  bool is_first = true;
128  auto skip_oho_id = [&is_first,&db,this]() {
129  if( is_first && db._undo_db.enabled() ) // this ensures that the current id is rolled back on undo
130  {
131  db.remove( db.create<operation_history_object>( []( operation_history_object& obj) {} ) );
132  is_first = false;
133  }
134  else
135  _oho_index->use_next_id();
136  };
137  for( const optional< operation_history_object >& o_op : hist ) {
138  optional <operation_history_object> oho;
139 
140  auto create_oho = [&]() {
141  is_first = false;
142  return optional<operation_history_object>(
143  db.create<operation_history_object>([&](operation_history_object &h) {
144  if (o_op.valid())
145  {
146  h.op = o_op->op;
147  h.result = o_op->result;
148  h.block_num = o_op->block_num;
149  h.trx_in_block = o_op->trx_in_block;
150  h.op_in_trx = o_op->op_in_trx;
151  h.virtual_op = o_op->virtual_op;
152  h.is_virtual = o_op->is_virtual;
153  h.block_time = o_op->block_time;
154  }
155  }));
156  };
157 
158  if( !o_op.valid() ) {
159  skip_oho_id();
160  continue;
161  }
162  oho = create_oho();
163 
164  // populate what we can before impacted loop
165  if( o_op->block_num > _options.start_es_after_block )
166  {
167  bulk_line_struct.operation_type = oho->op.which();
168  bulk_line_struct.operation_id_num = oho->id.instance();
169  doOperationHistory( oho, bulk_line_struct.operation_history );
170  doBlock( oho->trx_in_block, b, bulk_line_struct.block_data );
171  if( _options.visitor )
172  doVisitor( oho, *bulk_line_struct.additional_data );
173  }
174 
175  const operation_history_object& op = *o_op;
176 
177  // get the set of accounts this operation applies to
178  flat_set<account_id_type> impacted;
179  vector<authority> other;
180  // fee_payer is added here
181  operation_get_required_authorities( op.op, impacted, impacted, other,
182  MUST_IGNORE_CUSTOM_OP_REQD_AUTHS( db.head_block_time() ) );
183 
184  if( op.op.is_type< account_create_operation >() )
185  impacted.insert( account_id_type( op.result.get<object_id_type>() ) );
186 
187  // https://github.com/bitshares/bitshares-core/issues/265
188  if( HARDFORK_CORE_265_PASSED(b.timestamp) || !op.op.is_type< account_create_operation >() )
189  {
190  operation_get_impacted_accounts( op.op, impacted,
191  MUST_IGNORE_CUSTOM_OP_REQD_AUTHS( db.head_block_time() ) );
192  }
193 
194  if( op.result.is_type<extendable_operation_result>() )
195  {
196  const auto& op_result = op.result.get<extendable_operation_result>();
197  if( op_result.value.impacted_accounts.valid() )
198  {
199  for( const auto& a : *op_result.value.impacted_accounts )
200  impacted.insert( a );
201  }
202  }
203 
204  for( const auto& a : other )
205  for( const auto& item : a.account_auths )
206  impacted.insert( item.first );
207 
208  for( const auto& account_id : impacted )
209  {
210  // Note: we send bulk if there are too many items in bulk_lines
211  add_elasticsearch( account_id, oho, b.block_num() );
212  }
213 
214  }
215 
216  // we send bulk at end of block when we are in sync for better real time client experience
217  if( is_sync && !bulk_lines.empty() )
218  send_bulk( b.block_num() );
219 
220 }
221 
222 void elasticsearch_plugin_impl::send_bulk( uint32_t block_num )
223 {
224  ilog( "Sending ${n} lines of bulk data to ElasticSearch at block ${b}, approximate size ${s}",
225  ("n",bulk_lines.size())("b",block_num)("s",approximate_bulk_size) );
226  if( !es->send_bulk( bulk_lines ) )
227  {
228  elog( "Error sending ${n} lines of bulk data to ElasticSearch, the first lines are:",
229  ("n",bulk_lines.size()) );
230  const auto log_max = std::min( bulk_lines.size(), size_t(10) );
231  for( size_t i = 0; i < log_max; ++i )
232  {
233  edump( (bulk_lines[i]) );
234  }
235  FC_THROW_EXCEPTION( graphene::chain::plugin_exception,
236  "Error populating ES database, we are going to keep trying." );
237  }
238  bulk_lines.clear();
239  approximate_bulk_size = 0;
240  bulk_lines.reserve(limit_documents);
241 }
242 
243 void elasticsearch_plugin_impl::checkState(const fc::time_point_sec& block_time)
244 {
245  if((fc::time_point::now() - block_time) < fc::seconds(30))
246  {
247  limit_documents = _options.bulk_sync;
248  is_sync = true;
249  }
250  else
251  {
252  limit_documents = _options.bulk_replay;
253  is_sync = false;
254  }
255  bulk_lines.reserve(limit_documents);
256 }
257 
259 {
260  using result_type = account_id_type;
261 
262  template<typename OpType>
263  account_id_type operator()(const OpType& op) const
264  {
265  return op.fee_payer();
266  }
267 };
268 
269 void elasticsearch_plugin_impl::doOperationHistory( const optional <operation_history_object>& oho,
270  operation_history_struct& os ) const
271 { try {
272  os.trx_in_block = oho->trx_in_block;
273  os.op_in_trx = oho->op_in_trx;
274  os.virtual_op = oho->virtual_op;
275  os.is_virtual = oho->is_virtual;
276  os.fee_payer = oho->op.visit( get_fee_payer_visitor() );
277 
278  if(_options.operation_string)
279  os.op = fc::json::to_string(oho->op);
280 
282 
283  if(_options.operation_object) {
284  constexpr uint16_t current_depth = 2;
285  // op
288  _options.max_mapping_depth - current_depth );
289  // operation_result
290  variant v;
293  _options.max_mapping_depth - current_depth );
294  }
295 } FC_CAPTURE_LOG_AND_RETHROW( (oho) ) } // GCOVR_EXCL_LINE
296 
297 void elasticsearch_plugin_impl::doBlock(uint32_t trx_in_block, const signed_block& b, block_struct& bs) const
298 {
299  std::string trx_id = "";
300  if(trx_in_block < b.transactions.size())
301  trx_id = b.transactions[trx_in_block].id().str();
302  bs.block_num = b.block_num();
303  bs.block_time = b.timestamp;
304  bs.trx_id = trx_id;
305 }
306 
308 {
309  using result_type = void;
310 
312  asset_id_type fee_asset;
313 
314  asset_id_type transfer_asset_id;
316  account_id_type transfer_from;
317  account_id_type transfer_to;
318 
320  {
321  fee_asset = o.fee.asset_id;
322  fee_amount = o.fee.amount;
323 
324  transfer_asset_id = o.amount.asset_id;
325  transfer_amount = o.amount.amount;
326  transfer_from = o.from;
327  transfer_to = o.to;
328  }
329 
330  object_id_type fill_order_id;
331  account_id_type fill_account_id;
332  asset_id_type fill_pays_asset_id;
334  asset_id_type fill_receives_asset_id;
338 
340  {
341  fee_asset = o.fee.asset_id;
342  fee_amount = o.fee.amount;
343 
344  fill_order_id = o.order_id;
345  fill_account_id = o.account_id;
346  fill_pays_asset_id = o.pays.asset_id;
347  fill_pays_amount = o.pays.amount;
348  fill_receives_asset_id = o.receives.asset_id;
349  fill_receives_amount = o.receives.amount;
350  fill_fill_price = o.fill_price.to_real();
351  fill_is_maker = o.is_maker;
352  }
353 
354  template<typename T>
355  void operator()( const T& o )
356  {
357  fee_asset = o.fee.asset_id;
358  fee_amount = o.fee.amount;
359  }
360 };
361 
362 void elasticsearch_plugin_impl::doVisitor(const optional <operation_history_object>& oho, visitor_struct& vs) const
363 {
364  const graphene::chain::database& db = _self.database();
365 
366  operation_visitor o_v;
367  oho->op.visit(o_v);
368 
369  auto fee_asset = o_v.fee_asset(db);
370  vs.fee_data.asset = o_v.fee_asset;
371  vs.fee_data.asset_name = fee_asset.symbol;
372  vs.fee_data.amount = o_v.fee_amount;
373  vs.fee_data.amount_units = (o_v.fee_amount.value)/(double)asset::scaled_precision(fee_asset.precision).value;
374 
375  auto transfer_asset = o_v.transfer_asset_id(db);
376  vs.transfer_data.asset = o_v.transfer_asset_id;
377  vs.transfer_data.asset_name = transfer_asset.symbol;
378  vs.transfer_data.amount = o_v.transfer_amount;
379  vs.transfer_data.amount_units = (o_v.transfer_amount.value)
380  / (double)asset::scaled_precision(transfer_asset.precision).value;
381  vs.transfer_data.from = o_v.transfer_from;
382  vs.transfer_data.to = o_v.transfer_to;
383 
384  auto fill_pays_asset = o_v.fill_pays_asset_id(db);
385  auto fill_receives_asset = o_v.fill_receives_asset_id(db);
386  vs.fill_data.order_id = o_v.fill_order_id;
387  vs.fill_data.account_id = o_v.fill_account_id;
388  vs.fill_data.pays_asset_id = o_v.fill_pays_asset_id;
389  vs.fill_data.pays_asset_name = fill_pays_asset.symbol;
390  vs.fill_data.pays_amount = o_v.fill_pays_amount;
391  vs.fill_data.pays_amount_units = (o_v.fill_pays_amount.value)
392  / (double)asset::scaled_precision(fill_pays_asset.precision).value;
393  vs.fill_data.receives_asset_id = o_v.fill_receives_asset_id;
394  vs.fill_data.receives_asset_name = fill_receives_asset.symbol;
395  vs.fill_data.receives_amount = o_v.fill_receives_amount;
396  vs.fill_data.receives_amount_units = (o_v.fill_receives_amount.value)
397  / (double)asset::scaled_precision(fill_receives_asset.precision).value;
398 
399  auto fill_price = (o_v.fill_receives_amount.value
400  / (double)asset::scaled_precision(fill_receives_asset.precision).value)
401  / (o_v.fill_pays_amount.value
402  / (double)asset::scaled_precision(fill_pays_asset.precision).value);
403  vs.fill_data.fill_price_units = fill_price;
404  vs.fill_data.fill_price = o_v.fill_fill_price;
405  vs.fill_data.is_maker = o_v.fill_is_maker;
406 }
407 
408 void elasticsearch_plugin_impl::add_elasticsearch( const account_id_type& account_id,
409  const optional<operation_history_object>& oho,
410  uint32_t block_number )
411 {
412  graphene::chain::database& db = database();
413 
414  const auto &stats_obj = db.get_account_stats_by_owner( account_id );
415 
416  const auto &ath = db.create<account_history_object>(
417  [&oho,&account_id,&stats_obj]( account_history_object &obj ) {
418  obj.operation_id = oho->id;
419  obj.account = account_id;
420  obj.sequence = stats_obj.total_ops + 1;
421  obj.next = stats_obj.most_recent_op;
422  });
423 
424  db.modify( stats_obj, [&ath]( account_statistics_object &obj ) {
425  obj.most_recent_op = ath.id;
426  obj.total_ops = ath.sequence;
427  });
428 
429  if( block_number > _options.start_es_after_block )
430  {
431  bulk_line_struct.account_history = ath;
432 
433  auto bulk_line = fc::json::to_string(bulk_line_struct, fc::json::legacy_generator);
434 
435  fc::mutable_variant_object bulk_header;
436  bulk_header["_index"] = index_name;
437  if( !is_es_version_7_or_above )
438  bulk_header["_type"] = "_doc";
439  bulk_header["_id"] = std::string( ath.id );
440  auto prepare = graphene::utilities::createBulk(bulk_header, std::move(bulk_line));
441  std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk_lines));
442 
443  approximate_bulk_size += bulk_lines.back().size();
444 
445  if( bulk_lines.size() >= limit_documents
446  || approximate_bulk_size >= graphene::utilities::es_client::request_size_threshold )
447  send_bulk( block_number );
448  }
449  cleanObjects(ath, account_id);
450 }
451 
452 void elasticsearch_plugin_impl::cleanObjects( const account_history_object& ath,
453  const account_id_type& account_id )
454 {
455  graphene::chain::database& db = database();
456  // remove everything except current object from ath
457  const auto &his_idx = db.get_index_type<account_history_index>();
458  const auto &by_seq_idx = his_idx.indices().get<by_seq>();
459  auto itr = by_seq_idx.lower_bound(boost::make_tuple(account_id, 0));
460  if (itr != by_seq_idx.end() && itr->account == account_id && itr->id != ath.id) {
461  // if found, remove the entry
462  const auto remove_op_id = itr->operation_id;
463  const auto itr_remove = itr;
464  ++itr;
465  db.remove( *itr_remove );
466  // modify previous node's next pointer
467  // this should be always true, but just have a check here
468  if( itr != by_seq_idx.end() && itr->account == account_id )
469  {
470  db.modify( *itr, []( account_history_object& obj ){
471  obj.next = account_history_id_type();
472  });
473  }
474  // do the same on oho
475  const auto &by_opid_idx = his_idx.indices().get<by_opid>();
476  if (by_opid_idx.find(remove_op_id) == by_opid_idx.end()) {
477  db.remove(remove_op_id(db));
478  }
479  }
480 }
481 
482 } // end namespace detail
483 
484 elasticsearch_plugin::elasticsearch_plugin(graphene::app::application& app) :
485  plugin(app),
486  my( std::make_unique<detail::elasticsearch_plugin_impl>(*this) )
487 {
488  // Nothing else to do
489 }
490 
492 
494 {
495  return "elasticsearch";
496 }
498 {
499  return "Stores account history data in elasticsearch database(EXPERIMENTAL).";
500 }
501 
503  boost::program_options::options_description& cli,
504  boost::program_options::options_description& cfg
505  )
506 {
507  cli.add_options()
508  ("elasticsearch-node-url", boost::program_options::value<std::string>(),
509  "Elastic Search database node url(http://localhost:9200/)")
510  ("elasticsearch-basic-auth", boost::program_options::value<std::string>(),
511  "Pass basic auth to elasticsearch database('')")
512  ("elasticsearch-bulk-replay", boost::program_options::value<uint32_t>(),
513  "Number of bulk documents to index on replay(10000)")
514  ("elasticsearch-bulk-sync", boost::program_options::value<uint32_t>(),
515  "Number of bulk documents to index on a syncronied chain(100)")
516  ("elasticsearch-index-prefix", boost::program_options::value<std::string>(),
517  "Add a prefix to the index(bitshares-)")
518  ("elasticsearch-max-mapping-depth", boost::program_options::value<uint16_t>(),
519  "The maximum index mapping depth (index.mapping.depth.limit) setting in ES, "
520  "should be >=2. (20)")
521  ("elasticsearch-start-es-after-block", boost::program_options::value<uint32_t>(),
522  "Start doing ES job after block(0)")
523  ("elasticsearch-visitor", boost::program_options::value<bool>(),
524  "Use visitor to index additional data(slows down the replay(false))")
525  ("elasticsearch-operation-object", boost::program_options::value<bool>(),
526  "Save operation as object(true)")
527  ("elasticsearch-operation-string", boost::program_options::value<bool>(),
528  "Save operation as string. Needed to serve history api calls(false)")
529  ("elasticsearch-mode", boost::program_options::value<uint16_t>(),
530  "Mode of operation: only_save(0), only_query(1), all(2) - Default: 0")
531  ;
532  cfg.add(cli);
533 }
534 
535 void detail::elasticsearch_plugin_impl::init_program_options(const boost::program_options::variables_map& options)
536 {
537  _options.init( options );
538 
539  if( _options.visitor )
540  bulk_line_struct.additional_data = visitor_struct();
541 
542  es = std::make_unique<graphene::utilities::es_client>( _options.elasticsearch_url, _options.auth );
543 
544  FC_ASSERT( es->check_status(), "ES database is not up in url ${url}", ("url", _options.elasticsearch_url) );
545 
546  es->check_version_7_or_above( is_es_version_7_or_above );
547 }
548 
549 void detail::elasticsearch_plugin_impl::plugin_options::init(const boost::program_options::variables_map& options)
550 {
551  utilities::get_program_option( options, "elasticsearch-node-url", elasticsearch_url );
552  utilities::get_program_option( options, "elasticsearch-basic-auth", auth );
553  utilities::get_program_option( options, "elasticsearch-bulk-replay", bulk_replay );
554  utilities::get_program_option( options, "elasticsearch-bulk-sync", bulk_sync );
555  utilities::get_program_option( options, "elasticsearch-index-prefix", index_prefix );
556  utilities::get_program_option( options, "elasticsearch-max-mapping-depth", max_mapping_depth );
557  utilities::get_program_option( options, "elasticsearch-start-es-after-block", start_es_after_block );
558  utilities::get_program_option( options, "elasticsearch-visitor", visitor );
559  utilities::get_program_option( options, "elasticsearch-operation-object", operation_object );
560  utilities::get_program_option( options, "elasticsearch-operation-string", operation_string );
561 
562  FC_ASSERT( max_mapping_depth >= 2, "The minimum value of elasticsearch-max-mapping-depth is 2" );
563 
564  auto es_mode = static_cast<uint16_t>( elasticsearch_mode );
565  utilities::get_program_option( options, "elasticsearch-mode", es_mode );
566  if( es_mode > static_cast<uint16_t>( mode::all ) )
567  FC_THROW_EXCEPTION( graphene::chain::plugin_exception, "Elasticsearch mode not valid" );
568  elasticsearch_mode = static_cast<mode>( es_mode );
569 
570  if( mode::all == elasticsearch_mode && !operation_string )
571  {
572  FC_THROW_EXCEPTION( graphene::chain::plugin_exception,
573  "If elasticsearch-mode is set to all then elasticsearch-operation-string need to be true");
574  }
575 }
576 
577 void elasticsearch_plugin::plugin_initialize(const boost::program_options::variables_map& options)
578 {
579  my->init_program_options( options );
580 
583 
584  if( my->_options.elasticsearch_mode != mode::only_query )
585  {
586  // connect with group 0 to process before some special steps (e.g. snapshot or next_object_id)
587  database().applied_block.connect( 0, [this](const signed_block &b) {
588  my->update_account_histories(b);
589  });
590  }
591 }
592 
594 {
595  // Nothing to do
596 }
597 
598 static operation_history_object fromEStoOperation(const variant& source)
599 {
601 
602  const auto operation_id = source["account_history"]["operation_id"];
603  fc::from_variant( operation_id, result.id, GRAPHENE_MAX_NESTED_OBJECTS );
604 
605  const auto op = fc::json::from_string(source["operation_history"]["op"].as_string());
607 
608  const auto operation_result = fc::json::from_string(source["operation_history"]["operation_result"].as_string());
610 
611  result.block_num = source["block_data"]["block_num"].as_uint64();
612  result.trx_in_block = source["operation_history"]["trx_in_block"].as_uint64();
613  result.op_in_trx = source["operation_history"]["op_in_trx"].as_uint64();
614  result.trx_in_block = source["operation_history"]["virtual_op"].as_uint64();
615  result.is_virtual = source["operation_history"]["is_virtual"].as_bool();
616 
617  result.block_time = fc::time_point_sec::from_iso_string( source["block_data"]["block_time"].as_string() );
618 
619  return result;
620 }
621 
622 operation_history_object elasticsearch_plugin::get_operation_by_id( const operation_history_id_type& id ) const
623 {
624  const string operation_id_string = std::string(object_id_type(id));
625 
626  const string query = R"(
627  {
628  "query": {
629  "match":
630  {
631  "account_history.operation_id": ")" + operation_id_string + R"("
632  }
633  }
634  }
635  )";
636 
637  const auto uri = my->_options.index_prefix + ( my->is_es_version_7_or_above ? "*/_search" : "*/_doc/_search" );
638  const auto response = my->es->query( uri, query );
639  variant variant_response = fc::json::from_string(response);
640  const auto source = variant_response["hits"]["hits"][size_t(0)]["_source"];
641  return fromEStoOperation(source);
642 }
643 
644 vector<operation_history_object> elasticsearch_plugin::get_account_history(
645  const account_id_type& account_id,
646  const operation_history_id_type& stop,
647  uint64_t limit,
648  const operation_history_id_type& start ) const
649 {
650  const auto account_id_string = std::string( account_id );
651 
652  const auto stop_number = stop.instance.value;
653  const auto start_number = start.instance.value;
654 
655  string range = "";
656  if(stop_number == 0)
657  range = " AND operation_id_num: ["+fc::to_string(stop_number)+" TO "+fc::to_string(start_number)+"]";
658  else if(stop_number > 0)
659  range = " AND operation_id_num: {"+fc::to_string(stop_number)+" TO "+fc::to_string(start_number)+"]";
660  // FIXME the code above is either redundant or buggy
661 
662  const string query = R"(
663  {
664  "size": )" + fc::to_string(limit) + R"(,
665  "sort" : [{ "operation_id_num" : {"order" : "desc"}}],
666  "query": {
667  "bool": {
668  "must": [
669  {
670  "query_string": {
671  "query": "account_history.account: )" + account_id_string + range + R"("
672  }
673  }
674  ]
675  }
676  }
677  }
678  )";
679 
680  vector<operation_history_object> result;
681 
682  if( !my->es->check_status() )
683  return result;
684 
685  const auto uri = my->_options.index_prefix + ( my->is_es_version_7_or_above ? "*/_search" : "*/_doc/_search" );
686  const auto response = my->es->query( uri, query );
687 
688  variant variant_response = fc::json::from_string(response);
689 
690  const auto hits = variant_response["hits"]["total"];
691  size_t size;
692  if( hits.is_object() ) // ES-7 ?
693  size = hits["value"].as_uint64();
694  else // probably ES-6
695  size = hits.as_uint64();
696  size = std::min( size, size_t(limit) );
697 
698  const auto& data = variant_response["hits"]["hits"];
699  for( size_t i=0; i<size; ++i )
700  {
701  const auto& source = data[i]["_source"];
702  result.push_back(fromEStoOperation(source));
703  }
704  return result;
705 }
706 
708 {
709  return my->_options.elasticsearch_mode;
710 }
711 
712 } }
graphene::utilities::es_data_adaptor::adapt
static fc::variant adapt(const fc::variant_object &op, uint16_t max_depth)
Definition: elasticsearch.cpp:264
graphene::elasticsearch::elasticsearch_plugin::plugin_set_program_options
void plugin_set_program_options(boost::program_options::options_description &cli, boost::program_options::options_description &cfg) override
Fill in command line parameters used by the plugin.
Definition: elasticsearch_plugin.cpp:502
FC_PACK_MAX_DEPTH
#define FC_PACK_MAX_DEPTH
Definition: config.hpp:3
graphene::elasticsearch::operation_history_struct::op_in_trx
uint16_t op_in_trx
Definition: elasticsearch_plugin.hpp:84
graphene::db::object::id
object_id_type id
Definition: object.hpp:69
graphene::protocol::transfer_operation::amount
asset amount
The amount of asset to transfer from from to to.
Definition: transfer.hpp:58
graphene::chain::database
tracks the blockchain state in an extensible manner
Definition: database.hpp:70
graphene::elasticsearch::fee_struct::asset_name
std::string asset_name
Definition: elasticsearch_plugin.hpp:102
graphene::elasticsearch::fee_struct::amount
share_type amount
Definition: elasticsearch_plugin.hpp:103
fc::variant::get_array
variants & get_array()
Definition: variant.cpp:496
graphene::elasticsearch::fill_struct::receives_amount_units
double receives_amount_units
Definition: elasticsearch_plugin.hpp:126
graphene::elasticsearch::detail::operation_visitor::fill_pays_asset_id
asset_id_type fill_pays_asset_id
Definition: elasticsearch_plugin.cpp:332
graphene::utilities::get_program_option
void get_program_option(const boost::program_options::variables_map &from, const std::string &key, T &to)
Definition: boost_program_options.hpp:30
graphene::protocol::fill_order_operation
Definition: market.hpp:206
graphene::elasticsearch::detail::get_fee_payer_visitor::result_type
account_id_type result_type
Definition: elasticsearch_plugin.cpp:260
graphene::elasticsearch::fill_struct::receives_asset_id
asset_id_type receives_asset_id
Definition: elasticsearch_plugin.hpp:123
fc::mutable_variant_object
An order-perserving dictionary of variant's.
Definition: variant_object.hpp:108
graphene::utilities::createBulk
std::vector< std::string > createBulk(const fc::mutable_variant_object &bulk_header, std::string &&data)
Definition: elasticsearch.cpp:69
fc::to_string
std::string to_string(double)
Definition: string.cpp:73
graphene::chain::operation_history_object::virtual_op
uint32_t virtual_op
Definition: operation_history_object.hpp:67
graphene::elasticsearch::detail::get_fee_payer_visitor::operator()
account_id_type operator()(const OpType &op) const
Definition: elasticsearch_plugin.cpp:263
graphene::chain::operation_history_object::op
operation op
Definition: operation_history_object.hpp:58
graphene::db::primary_index
Wraps a derived index to intercept calls to create, modify, and remove so that callbacks may be fired...
Definition: index.hpp:312
graphene::elasticsearch::elasticsearch_plugin::plugin_name
std::string plugin_name() const override
Get the name of the plugin.
Definition: elasticsearch_plugin.cpp:493
graphene::app::plugin::database
chain::database & database()
Definition: plugin.hpp:115
fc::static_variant< void_result, object_id_type, asset, generic_operation_result, generic_exchange_operation_result, extendable_operation_result >
impacted.hpp
fc::variant::as_uint64
uint64_t as_uint64() const
Definition: variant.cpp:398
graphene::elasticsearch::fill_struct::receives_amount
share_type receives_amount
Definition: elasticsearch_plugin.hpp:125
graphene::elasticsearch::transfer_struct::asset_name
std::string asset_name
Definition: elasticsearch_plugin.hpp:109
graphene::elasticsearch::operation_history_struct::fee_payer
account_id_type fee_payer
Definition: elasticsearch_plugin.hpp:87
boost_program_options.hpp
account_evaluator.hpp
graphene::protocol::asset::scaled_precision
static share_type scaled_precision(uint8_t precision)
Definition: asset.cpp:363
graphene::elasticsearch::fill_struct::pays_amount_units
double pays_amount_units
Definition: elasticsearch_plugin.hpp:122
graphene::protocol::fill_order_operation::fill_price
price fill_price
Definition: market.hpp:219
graphene::elasticsearch::detail::operation_visitor::transfer_asset_id
asset_id_type transfer_asset_id
Definition: elasticsearch_plugin.cpp:314
fc::time_point_sec::from_iso_string
static time_point_sec from_iso_string(const std::string &s)
Definition: time.cpp:35
graphene::elasticsearch::fill_struct::order_id
object_id_type order_id
Definition: elasticsearch_plugin.hpp:117
graphene::elasticsearch::transfer_struct::asset
asset_id_type asset
Definition: elasticsearch_plugin.hpp:108
graphene::elasticsearch::bulk_struct::operation_type
int64_t operation_type
Definition: elasticsearch_plugin.hpp:141
graphene::elasticsearch::operation_history_struct::operation_result_object
variant operation_result_object
Definition: elasticsearch_plugin.hpp:91
graphene::elasticsearch::elasticsearch_plugin::~elasticsearch_plugin
~elasticsearch_plugin() override
graphene::elasticsearch::fill_struct::pays_asset_name
std::string pays_asset_name
Definition: elasticsearch_plugin.hpp:120
graphene::chain::operation_history_object::trx_in_block
uint16_t trx_in_block
Definition: operation_history_object.hpp:63
graphene::chain::operation_history_object::op_in_trx
uint16_t op_in_trx
Definition: operation_history_object.hpp:65
graphene::elasticsearch::detail::operation_visitor::transfer_from
account_id_type transfer_from
Definition: elasticsearch_plugin.cpp:316
fc::json::from_string
static variant from_string(const string &utf8_str, parse_type ptype=legacy_parser, uint32_t max_depth=DEFAULT_MAX_RECURSION_DEPTH)
Definition: json.cpp:458
graphene::protocol::fill_order_operation::is_maker
bool is_maker
Definition: market.hpp:220
graphene::db::object_database::create
const T & create(F &&constructor)
Definition: object_database.hpp:63
graphene::elasticsearch::transfer_struct::to
account_id_type to
Definition: elasticsearch_plugin.hpp:113
graphene::elasticsearch::detail::operation_visitor::fill_pays_amount
share_type fill_pays_amount
Definition: elasticsearch_plugin.cpp:333
graphene::elasticsearch::visitor_struct::transfer_data
transfer_struct transfer_data
Definition: elasticsearch_plugin.hpp:134
graphene::chain::database::get_applied_operations
const vector< optional< operation_history_object > > & get_applied_operations() const
Definition: db_block.cpp:566
graphene::elasticsearch::detail::elasticsearch_plugin_impl::elasticsearch_plugin_impl
elasticsearch_plugin_impl(elasticsearch_plugin &_plugin)
Definition: elasticsearch_plugin.cpp:42
fc::from_variant
void from_variant(const variant &var, flat_set< T, A... > &vo, uint32_t _max_depth)
Definition: flat.hpp:116
graphene::chain::database::applied_block
fc::signal< void(const signed_block &)> applied_block
Definition: database.hpp:624
graphene::elasticsearch::visitor_struct::fill_data
fill_struct fill_data
Definition: elasticsearch_plugin.hpp:135
graphene::elasticsearch::elasticsearch_plugin::get_operation_by_id
operation_history_object get_operation_by_id(const operation_history_id_type &id) const
Definition: elasticsearch_plugin.cpp:622
graphene::protocol::transfer_operation::fee
asset fee
Definition: transfer.hpp:52
graphene::protocol::fill_order_operation::fee
asset fee
Definition: market.hpp:218
graphene::chain::account_history_index
generic_index< account_history_object, account_history_multi_idx_type > account_history_index
Definition: operation_history_object.hpp:165
FC_CAPTURE_LOG_AND_RETHROW
#define FC_CAPTURE_LOG_AND_RETHROW(...)
Definition: exception.hpp:415
fc::seconds
microseconds seconds(int64_t s)
Definition: time.hpp:34
graphene::elasticsearch::detail::operation_visitor::fill_is_maker
bool fill_is_maker
Definition: elasticsearch_plugin.cpp:337
fc::json::to_string
static string to_string(const variant &v, output_formatting format=stringify_large_ints_and_doubles, uint32_t max_depth=DEFAULT_MAX_RECURSION_DEPTH)
Definition: json.cpp:650
graphene::elasticsearch::bulk_struct::operation_history
operation_history_struct operation_history
Definition: elasticsearch_plugin.hpp:140
graphene::elasticsearch::fee_struct::amount_units
double amount_units
Definition: elasticsearch_plugin.hpp:104
graphene::elasticsearch::bulk_struct
Definition: elasticsearch_plugin.hpp:138
graphene::elasticsearch::visitor_struct::fee_data
fee_struct fee_data
Definition: elasticsearch_plugin.hpp:133
graphene::app::application
Definition: application.hpp:91
graphene::elasticsearch::detail::elasticsearch_plugin_impl
Definition: elasticsearch_plugin.cpp:39
graphene::utilities::es_client::request_size_threshold
static constexpr size_t request_size_threshold
When doing bulk operations, call send_bulk when the approximate size of pending data reaches this val...
Definition: elasticsearch.hpp:110
fc::variant::as_bool
bool as_bool() const
Definition: variant.cpp:441
graphene::elasticsearch::bulk_struct::additional_data
optional< visitor_struct > additional_data
Definition: elasticsearch_plugin.hpp:144
graphene::elasticsearch::detail::operation_visitor::fee_amount
share_type fee_amount
Definition: elasticsearch_plugin.cpp:311
graphene::elasticsearch::fill_struct::account_id
account_id_type account_id
Definition: elasticsearch_plugin.hpp:118
graphene::elasticsearch::detail::get_fee_payer_visitor
Definition: elasticsearch_plugin.cpp:258
graphene::elasticsearch::fill_struct::pays_amount
share_type pays_amount
Definition: elasticsearch_plugin.hpp:121
graphene::protocol::fill_order_operation::pays
asset pays
Definition: market.hpp:216
graphene::chain::operation_history_object::block_num
uint32_t block_num
Definition: operation_history_object.hpp:61
graphene::elasticsearch::fill_struct::is_maker
bool is_maker
Definition: elasticsearch_plugin.hpp:129
ilog
#define ilog(FORMAT,...)
Definition: logger.hpp:117
elasticsearch_plugin.hpp
edump
#define edump(SEQ)
Definition: logger.hpp:182
graphene::elasticsearch::operation_history_struct::op
std::string op
Definition: elasticsearch_plugin.hpp:88
graphene::db::object_database::add_index
IndexType * add_index()
Definition: object_database.hpp:144
graphene::elasticsearch::detail::operation_visitor::transfer_to
account_id_type transfer_to
Definition: elasticsearch_plugin.cpp:317
graphene::elasticsearch::detail::operation_visitor::fill_account_id
account_id_type fill_account_id
Definition: elasticsearch_plugin.cpp:331
graphene::elasticsearch::detail::operation_visitor::fill_receives_amount
share_type fill_receives_amount
Definition: elasticsearch_plugin.cpp:335
graphene::elasticsearch::mode::all
@ all
fc::time_point_sec
Definition: time.hpp:74
graphene::protocol::asset::asset_id
asset_id_type asset_id
Definition: asset.hpp:37
graphene::protocol::transfer_operation
Transfers an amount of one asset from one account to another.
Definition: transfer.hpp:45
fc::from_static_variant
Definition: static_variant.hpp:341
graphene::elasticsearch::transfer_struct::amount
share_type amount
Definition: elasticsearch_plugin.hpp:110
graphene::elasticsearch::detail::operation_visitor::fill_receives_asset_id
asset_id_type fill_receives_asset_id
Definition: elasticsearch_plugin.cpp:334
graphene::elasticsearch::detail::operation_visitor::operator()
void operator()(const graphene::chain::transfer_operation &o)
Definition: elasticsearch_plugin.cpp:319
fc::json::legacy_generator
@ legacy_generator
Definition: json.hpp:33
graphene::protocol::fill_order_operation::account_id
account_id_type account_id
Definition: market.hpp:215
graphene::elasticsearch::fill_struct::fill_price_units
double fill_price_units
Definition: elasticsearch_plugin.hpp:128
graphene::elasticsearch::operation_history_struct::operation_result
std::string operation_result
Definition: elasticsearch_plugin.hpp:89
fc::to_variant
void to_variant(const flat_set< T, A... > &var, variant &vo, uint32_t _max_depth)
Definition: flat.hpp:105
graphene::elasticsearch::visitor_struct
Definition: elasticsearch_plugin.hpp:132
graphene::elasticsearch::elasticsearch_plugin::plugin_initialize
void plugin_initialize(const boost::program_options::variables_map &options) override
Perform early startup routines and register plugin indexes, callbacks, etc.
Definition: elasticsearch_plugin.cpp:577
graphene::protocol::price::to_real
double to_real() const
Definition: asset.hpp:127
graphene::utilities::es_data_adaptor::adapt_static_variant
static fc::variant adapt_static_variant(const fc::variants &v, uint16_t max_depth)
Definition: elasticsearch.cpp:451
graphene::elasticsearch::elasticsearch_plugin::get_account_history
vector< operation_history_object > get_account_history(const account_id_type &account_id, const operation_history_id_type &stop=operation_history_id_type(), uint64_t limit=100, const operation_history_id_type &start=operation_history_id_type()) const
Definition: elasticsearch_plugin.cpp:644
graphene::protocol::extendable_operation_result
extension< extendable_operation_result_dtl > extendable_operation_result
Definition: base.hpp:113
graphene::protocol::transfer_operation::to
account_id_type to
Account to transfer asset to.
Definition: transfer.hpp:56
graphene::elasticsearch::operation_history_struct
Definition: elasticsearch_plugin.hpp:82
graphene::elasticsearch::fill_struct::fill_price
double fill_price
Definition: elasticsearch_plugin.hpp:127
graphene::elasticsearch::detail::operation_visitor::fee_asset
asset_id_type fee_asset
Definition: elasticsearch_plugin.cpp:312
graphene::elasticsearch::mode::only_query
@ only_query
graphene::elasticsearch::transfer_struct::from
account_id_type from
Definition: elasticsearch_plugin.hpp:112
graphene::protocol::transfer_operation::from
account_id_type from
Account to transfer asset from.
Definition: transfer.hpp:54
graphene::elasticsearch::operation_history_struct::op_object
variant op_object
Definition: elasticsearch_plugin.hpp:90
graphene::elasticsearch::bulk_struct::block_data
block_struct block_data
Definition: elasticsearch_plugin.hpp:143
graphene::protocol::operation_get_required_authorities
void operation_get_required_authorities(const operation &op, flat_set< account_id_type > &active, flat_set< account_id_type > &owner, vector< authority > &other, bool ignore_custom_operation_required_auths)
Definition: operations.cpp:103
graphene::elasticsearch::mode
mode
Definition: elasticsearch_plugin.hpp:53
fc::time_point::now
static time_point now()
Definition: time.cpp:13
FC_ASSERT
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
Definition: exception.hpp:345
graphene::elasticsearch::detail::operation_visitor::fill_order_id
object_id_type fill_order_id
Definition: elasticsearch_plugin.cpp:330
graphene::elasticsearch::elasticsearch_plugin
Definition: elasticsearch_plugin.hpp:55
graphene::elasticsearch::mode::only_save
@ only_save
graphene::elasticsearch::fill_struct::receives_asset_name
std::string receives_asset_name
Definition: elasticsearch_plugin.hpp:124
fc::variant::get_object
variant_object & get_object()
Definition: variant.cpp:554
fc::variant
stores null, int64, uint64, double, bool, string, std::vector<variant>, and variant_object's.
Definition: variant.hpp:198
graphene::chain::account_history_object
a node in a linked list of operation_history_objects
Definition: operation_history_object.hpp:95
graphene::db::object_id_type
Definition: object_id.hpp:30
graphene::protocol::asset::amount
share_type amount
Definition: asset.hpp:36
graphene::elasticsearch::operation_history_struct::trx_in_block
uint16_t trx_in_block
Definition: elasticsearch_plugin.hpp:83
std
Definition: zeroed_array.hpp:76
graphene::elasticsearch::detail::operation_visitor::transfer_amount
share_type transfer_amount
Definition: elasticsearch_plugin.cpp:315
graphene::elasticsearch::fill_struct::pays_asset_id
asset_id_type pays_asset_id
Definition: elasticsearch_plugin.hpp:119
graphene::elasticsearch::bulk_struct::operation_id_num
uint64_t operation_id_num
Definition: elasticsearch_plugin.hpp:142
graphene::chain::operation_get_impacted_accounts
void operation_get_impacted_accounts(const operation &op, flat_set< account_id_type > &result, bool ignore_custom_op_required_auths)
Definition: db_notify.cpp:403
graphene::elasticsearch::fee_struct::asset
asset_id_type asset
Definition: elasticsearch_plugin.hpp:101
graphene::chain::operation_history_object::result
operation_result result
Definition: operation_history_object.hpp:59
GRAPHENE_MAX_NESTED_OBJECTS
#define GRAPHENE_MAX_NESTED_OBJECTS
Definition: config.hpp:33
graphene::elasticsearch::transfer_struct::amount_units
double amount_units
Definition: elasticsearch_plugin.hpp:111
graphene::protocol::fill_order_operation::receives
asset receives
Definition: market.hpp:217
graphene::elasticsearch::detail::operation_visitor::fill_fill_price
double fill_fill_price
Definition: elasticsearch_plugin.cpp:336
graphene::elasticsearch::detail::operation_visitor::operator()
void operator()(const graphene::chain::fill_order_operation &o)
Definition: elasticsearch_plugin.cpp:339
graphene::elasticsearch::elasticsearch_plugin::get_running_mode
mode get_running_mode() const
Definition: elasticsearch_plugin.cpp:707
graphene::chain::database::get_account_stats_by_owner
const account_statistics_object & get_account_stats_by_owner(account_id_type owner) const
Definition: db_getter.cpp:142
graphene::elasticsearch::operation_history_struct::is_virtual
bool is_virtual
Definition: elasticsearch_plugin.hpp:86
fc::static_variant::visit
visitor::result_type visit(visitor &v)
Definition: static_variant.hpp:256
fc::optional
provides stack-based nullable value similar to boost::optional
Definition: optional.hpp:20
graphene::elasticsearch::detail::operation_visitor::operator()
void operator()(const T &o)
Definition: elasticsearch_plugin.cpp:355
graphene::chain::operation_history_object::block_time
time_point_sec block_time
Definition: operation_history_object.hpp:71
FC_THROW_EXCEPTION
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
Definition: exception.hpp:379
graphene::elasticsearch::operation_history_struct::virtual_op
uint32_t virtual_op
Definition: elasticsearch_plugin.hpp:85
graphene::protocol::signed_block
Definition: block.hpp:64
graphene::db::object_database::remove
void remove(const object &obj)
Definition: object_database.hpp:97
graphene::elasticsearch::elasticsearch_plugin::plugin_description
std::string plugin_description() const override
Get the description of the plugin.
Definition: elasticsearch_plugin.cpp:497
graphene::db::object_database::get_index_type
const IndexType & get_index_type() const
Definition: object_database.hpp:77
fc::safe::value
T value
Definition: safe.hpp:28
graphene::elasticsearch::elasticsearch_plugin::plugin_startup
void plugin_startup() override
Begin normal runtime operations.
Definition: elasticsearch_plugin.cpp:593
graphene::protocol::fill_order_operation::order_id
object_id_type order_id
Definition: market.hpp:214
graphene::chain::database::database
database()
Definition: db_management.cpp:44
graphene::elasticsearch::detail::operation_visitor
Definition: elasticsearch_plugin.cpp:307
graphene::elasticsearch::block_struct
Definition: elasticsearch_plugin.hpp:94
graphene
Definition: api.cpp:48
fc::time_point_sec::to_iso_string
std::string to_iso_string() const
Definition: time.cpp:24
graphene::elasticsearch::detail::operation_visitor::result_type
void result_type
Definition: elasticsearch_plugin.cpp:309
graphene::db::object_database::modify
void modify(const T &obj, const Lambda &m)
Definition: object_database.hpp:99
graphene::chain::operation_history_object
tracks the history of all logical operations on blockchain state
Definition: operation_history_object.hpp:48
graphene::chain::operation_history_object::is_virtual
bool is_virtual
Definition: operation_history_object.hpp:69
elog
#define elog(FORMAT,...)
Definition: logger.hpp:129
fc::safe
Definition: safe.hpp:26