BitShares-Core  6.1.0
BitShares blockchain implementation and command-line interface software
elasticsearch_plugin.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017 Cryptonomex, Inc., and contributors.
3  *
4  * The MIT License
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  */
24 
28 #include <graphene/chain/hardfork.hpp>
29 
30 #include <boost/algorithm/string.hpp>
31 
33 
34 namespace graphene { namespace elasticsearch {
35 
36 namespace detail
37 {
38 
40 {
41  public:
43  : _self( _plugin )
44  { }
45 
46  private:
48 
49  struct plugin_options
50  {
51  std::string elasticsearch_url = "http://localhost:9200/";
52  std::string auth = "";
53  uint32_t bulk_replay = 10000;
54  uint32_t bulk_sync = 100;
55 
56  std::string index_prefix = "bitshares-";
57 
59  uint16_t max_mapping_depth = 20;
60 
61  uint32_t start_es_after_block = 0;
62 
63  bool visitor = false;
64  bool operation_object = true;
65  bool operation_string = false;
66 
67  mode elasticsearch_mode = mode::only_save;
68 
69  void init(const boost::program_options::variables_map& options);
70  };
71 
72  void update_account_histories( const signed_block& b );
73 
75  {
76  return _self.database();
77  }
78 
79  elasticsearch_plugin& _self;
80  plugin_options _options;
81 
82  primary_index< operation_history_index >* _oho_index;
83 
84  uint32_t limit_documents = _options.bulk_replay;
85 
86  std::unique_ptr<graphene::utilities::es_client> es;
87 
88  vector <string> bulk_lines; // vector of op lines
89  size_t approximate_bulk_size = 0;
90 
91  bulk_struct bulk_line_struct;
92 
93  std::string index_name;
94  bool is_sync = false;
95  bool is_es_version_7_or_above = true;
96 
97  void add_elasticsearch( const account_id_type& account_id, const optional<operation_history_object>& oho,
98  uint32_t block_number );
99  void send_bulk( uint32_t block_num );
100 
101  void doOperationHistory(const optional <operation_history_object>& oho, operation_history_struct& os) const;
102  void doBlock(uint32_t trx_in_block, const signed_block& b, block_struct& bs) const;
103  void doVisitor(const optional <operation_history_object>& oho, visitor_struct& vs) const;
104  void checkState(const fc::time_point_sec& block_time);
105  void cleanObjects(const account_history_object& ath, const account_id_type& account_id);
106 
107  void init_program_options(const boost::program_options::variables_map& options);
108 };
109 
110 static std::string generateIndexName( const fc::time_point_sec& block_date,
111  const std::string& index_prefix )
112 {
113  auto block_date_string = block_date.to_iso_string();
114  std::vector<std::string> parts;
115  boost::split(parts, block_date_string, boost::is_any_of("-"));
116  std::string index_name = index_prefix + parts[0] + "-" + parts[1];
117  return index_name;
118 }
119 
120 void elasticsearch_plugin_impl::update_account_histories( const signed_block& b )
121 {
122  checkState(b.timestamp);
123  index_name = generateIndexName(b.timestamp, _options.index_prefix);
124 
126  const vector<optional< operation_history_object > >& hist = db.get_applied_operations();
127  bool is_first = true;
128  auto skip_oho_id = [&is_first,&db,this]() {
129  if( is_first && db._undo_db.enabled() ) // this ensures that the current id is rolled back on undo
130  {
131  db.remove( db.create<operation_history_object>( []( operation_history_object& obj) {} ) );
132  is_first = false;
133  }
134  else
135  _oho_index->use_next_id();
136  };
137  for( const optional< operation_history_object >& o_op : hist ) {
138  optional <operation_history_object> oho;
139 
140  auto create_oho = [&]() {
141  is_first = false;
142  return optional<operation_history_object>(
144  if (o_op.valid())
145  {
146  h.op = o_op->op;
147  h.result = o_op->result;
148  h.block_num = o_op->block_num;
149  h.trx_in_block = o_op->trx_in_block;
150  h.op_in_trx = o_op->op_in_trx;
151  h.virtual_op = o_op->virtual_op;
152  h.is_virtual = o_op->is_virtual;
153  h.block_time = o_op->block_time;
154  }
155  }));
156  };
157 
158  if( !o_op.valid() ) {
159  skip_oho_id();
160  continue;
161  }
162  oho = create_oho();
163 
164  // populate what we can before impacted loop
165  if( o_op->block_num > _options.start_es_after_block )
166  {
167  bulk_line_struct.operation_type = oho->op.which();
168  bulk_line_struct.operation_id_num = oho->id.instance();
169  doOperationHistory( oho, bulk_line_struct.operation_history );
170  doBlock( oho->trx_in_block, b, bulk_line_struct.block_data );
171  if( _options.visitor )
172  doVisitor( oho, *bulk_line_struct.additional_data );
173  }
174 
175  const operation_history_object& op = *o_op;
176 
177  // get the set of accounts this operation applies to
178  flat_set<account_id_type> impacted;
179  vector<authority> other;
180  // fee_payer is added here
181  operation_get_required_authorities( op.op, impacted, impacted, other,
182  MUST_IGNORE_CUSTOM_OP_REQD_AUTHS( db.head_block_time() ) );
183 
184  if( op.op.is_type< account_create_operation >() )
185  impacted.insert( account_id_type( op.result.get<object_id_type>() ) );
186 
187  // https://github.com/bitshares/bitshares-core/issues/265
188  if( HARDFORK_CORE_265_PASSED(b.timestamp) || !op.op.is_type< account_create_operation >() )
189  {
190  operation_get_impacted_accounts( op.op, impacted,
191  MUST_IGNORE_CUSTOM_OP_REQD_AUTHS( db.head_block_time() ) );
192  }
193 
195  {
196  const auto& op_result = op.result.get<extendable_operation_result>();
197  if( op_result.value.impacted_accounts.valid() )
198  {
199  for( const auto& a : *op_result.value.impacted_accounts )
200  impacted.insert( a );
201  }
202  }
203 
204  for( const auto& a : other )
205  for( const auto& item : a.account_auths )
206  impacted.insert( item.first );
207 
208  for( const auto& account_id : impacted )
209  {
210  // Note: we send bulk if there are too many items in bulk_lines
211  add_elasticsearch( account_id, oho, b.block_num() );
212  }
213 
214  }
215 
216  // we send bulk at end of block when we are in sync for better real time client experience
217  if( is_sync && !bulk_lines.empty() )
218  send_bulk( b.block_num() );
219 
220 }
221 
222 void elasticsearch_plugin_impl::send_bulk( uint32_t block_num )
223 {
224  ilog( "Sending ${n} lines of bulk data to ElasticSearch at block ${b}, approximate size ${s}",
225  ("n",bulk_lines.size())("b",block_num)("s",approximate_bulk_size) );
226  if( !es->send_bulk( bulk_lines ) )
227  {
228  elog( "Error sending ${n} lines of bulk data to ElasticSearch, the first lines are:",
229  ("n",bulk_lines.size()) );
230  const auto log_max = std::min( bulk_lines.size(), size_t(10) );
231  for( size_t i = 0; i < log_max; ++i )
232  {
233  edump( (bulk_lines[i]) );
234  }
235  FC_THROW_EXCEPTION( graphene::chain::plugin_exception,
236  "Error populating ES database, we are going to keep trying." );
237  }
238  bulk_lines.clear();
239  approximate_bulk_size = 0;
240  bulk_lines.reserve(limit_documents);
241 }
242 
243 void elasticsearch_plugin_impl::checkState(const fc::time_point_sec& block_time)
244 {
245  if((fc::time_point::now() - block_time) < fc::seconds(30))
246  {
247  limit_documents = _options.bulk_sync;
248  is_sync = true;
249  }
250  else
251  {
252  limit_documents = _options.bulk_replay;
253  is_sync = false;
254  }
255  bulk_lines.reserve(limit_documents);
256 }
257 
259 {
260  using result_type = account_id_type;
261 
262  template<typename OpType>
263  account_id_type operator()(const OpType& op) const
264  {
265  return op.fee_payer();
266  }
267 };
268 
269 void elasticsearch_plugin_impl::doOperationHistory( const optional <operation_history_object>& oho,
270  operation_history_struct& os ) const
271 { try {
272  os.trx_in_block = oho->trx_in_block;
273  os.op_in_trx = oho->op_in_trx;
274  os.virtual_op = oho->virtual_op;
275  os.fee_payer = oho->op.visit( get_fee_payer_visitor() );
276 
277  if(_options.operation_string)
278  os.op = fc::json::to_string(oho->op);
279 
280  os.operation_result = fc::json::to_string(oho->result);
281 
282  if(_options.operation_object) {
283  constexpr uint16_t current_depth = 2;
284  // op
287  _options.max_mapping_depth - current_depth );
288  // operation_result
289  variant v;
290  fc::to_variant( oho->result, v, FC_PACK_MAX_DEPTH );
292  _options.max_mapping_depth - current_depth );
293  }
294 } FC_CAPTURE_LOG_AND_RETHROW( (oho) ) }
295 
296 void elasticsearch_plugin_impl::doBlock(uint32_t trx_in_block, const signed_block& b, block_struct& bs) const
297 {
298  std::string trx_id = "";
299  if(trx_in_block < b.transactions.size())
300  trx_id = b.transactions[trx_in_block].id().str();
301  bs.block_num = b.block_num();
302  bs.block_time = b.timestamp;
303  bs.trx_id = trx_id;
304 }
305 
307 {
308  using result_type = void;
309 
311  asset_id_type fee_asset;
312 
313  asset_id_type transfer_asset_id;
315  account_id_type transfer_from;
316  account_id_type transfer_to;
317 
319  {
320  fee_asset = o.fee.asset_id;
321  fee_amount = o.fee.amount;
322 
323  transfer_asset_id = o.amount.asset_id;
324  transfer_amount = o.amount.amount;
325  transfer_from = o.from;
326  transfer_to = o.to;
327  }
328 
329  object_id_type fill_order_id;
330  account_id_type fill_account_id;
331  asset_id_type fill_pays_asset_id;
333  asset_id_type fill_receives_asset_id;
337 
339  {
340  fee_asset = o.fee.asset_id;
341  fee_amount = o.fee.amount;
342 
343  fill_order_id = o.order_id;
344  fill_account_id = o.account_id;
345  fill_pays_asset_id = o.pays.asset_id;
346  fill_pays_amount = o.pays.amount;
347  fill_receives_asset_id = o.receives.asset_id;
348  fill_receives_amount = o.receives.amount;
349  fill_fill_price = o.fill_price.to_real();
350  fill_is_maker = o.is_maker;
351  }
352 
353  template<typename T>
354  void operator()( const T& o )
355  {
356  fee_asset = o.fee.asset_id;
357  fee_amount = o.fee.amount;
358  }
359 };
360 
361 void elasticsearch_plugin_impl::doVisitor(const optional <operation_history_object>& oho, visitor_struct& vs) const
362 {
363  const graphene::chain::database& db = _self.database();
364 
365  operation_visitor o_v;
366  oho->op.visit(o_v);
367 
368  auto fee_asset = o_v.fee_asset(db);
369  vs.fee_data.asset = o_v.fee_asset;
370  vs.fee_data.asset_name = fee_asset.symbol;
371  vs.fee_data.amount = o_v.fee_amount;
372  vs.fee_data.amount_units = (o_v.fee_amount.value)/(double)asset::scaled_precision(fee_asset.precision).value;
373 
374  auto transfer_asset = o_v.transfer_asset_id(db);
376  vs.transfer_data.asset_name = transfer_asset.symbol;
379  / (double)asset::scaled_precision(transfer_asset.precision).value;
381  vs.transfer_data.to = o_v.transfer_to;
382 
383  auto fill_pays_asset = o_v.fill_pays_asset_id(db);
384  auto fill_receives_asset = o_v.fill_receives_asset_id(db);
388  vs.fill_data.pays_asset_name = fill_pays_asset.symbol;
391  / (double)asset::scaled_precision(fill_pays_asset.precision).value;
393  vs.fill_data.receives_asset_name = fill_receives_asset.symbol;
396  / (double)asset::scaled_precision(fill_receives_asset.precision).value;
397 
398  auto fill_price = (o_v.fill_receives_amount.value
399  / (double)asset::scaled_precision(fill_receives_asset.precision).value)
400  / (o_v.fill_pays_amount.value
401  / (double)asset::scaled_precision(fill_pays_asset.precision).value);
402  vs.fill_data.fill_price_units = fill_price;
405 }
406 
407 void elasticsearch_plugin_impl::add_elasticsearch( const account_id_type& account_id,
409  uint32_t block_number )
410 {
412 
413  const auto &stats_obj = db.get_account_stats_by_owner( account_id );
414 
415  const auto &ath = db.create<account_history_object>(
416  [&oho,&account_id,&stats_obj]( account_history_object &obj ) {
417  obj.operation_id = oho->id;
418  obj.account = account_id;
419  obj.sequence = stats_obj.total_ops + 1;
420  obj.next = stats_obj.most_recent_op;
421  });
422 
423  db.modify( stats_obj, [&ath]( account_statistics_object &obj ) {
424  obj.most_recent_op = ath.id;
425  obj.total_ops = ath.sequence;
426  });
427 
428  if( block_number > _options.start_es_after_block )
429  {
430  bulk_line_struct.account_history = ath;
431 
432  auto bulk_line = fc::json::to_string(bulk_line_struct, fc::json::legacy_generator);
433 
434  fc::mutable_variant_object bulk_header;
435  bulk_header["_index"] = index_name;
436  if( !is_es_version_7_or_above )
437  bulk_header["_type"] = "_doc";
438  bulk_header["_id"] = std::string( ath.id );
439  auto prepare = graphene::utilities::createBulk(bulk_header, std::move(bulk_line));
440  std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk_lines));
441 
442  approximate_bulk_size += bulk_lines.back().size();
443 
444  if( bulk_lines.size() >= limit_documents
445  || approximate_bulk_size >= graphene::utilities::es_client::request_size_threshold )
446  send_bulk( block_number );
447  }
448  cleanObjects(ath, account_id);
449 }
450 
451 void elasticsearch_plugin_impl::cleanObjects( const account_history_object& ath,
452  const account_id_type& account_id )
453 {
455  // remove everything except current object from ath
456  const auto &his_idx = db.get_index_type<account_history_index>();
457  const auto &by_seq_idx = his_idx.indices().get<by_seq>();
458  auto itr = by_seq_idx.lower_bound(boost::make_tuple(account_id, 0));
459  if (itr != by_seq_idx.end() && itr->account == account_id && itr->id != ath.id) {
460  // if found, remove the entry
461  const auto remove_op_id = itr->operation_id;
462  const auto itr_remove = itr;
463  ++itr;
464  db.remove( *itr_remove );
465  // modify previous node's next pointer
466  // this should be always true, but just have a check here
467  if( itr != by_seq_idx.end() && itr->account == account_id )
468  {
469  db.modify( *itr, []( account_history_object& obj ){
470  obj.next = account_history_id_type();
471  });
472  }
473  // do the same on oho
474  const auto &by_opid_idx = his_idx.indices().get<by_opid>();
475  if (by_opid_idx.find(remove_op_id) == by_opid_idx.end()) {
476  db.remove(remove_op_id(db));
477  }
478  }
479 }
480 
481 } // end namespace detail
482 
484  plugin(app),
485  my( std::make_unique<detail::elasticsearch_plugin_impl>(*this) )
486 {
487  // Nothing else to do
488 }
489 
491 
493 {
494  return "elasticsearch";
495 }
497 {
498  return "Stores account history data in elasticsearch database(EXPERIMENTAL).";
499 }
500 
502  boost::program_options::options_description& cli,
503  boost::program_options::options_description& cfg
504  )
505 {
506  cli.add_options()
507  ("elasticsearch-node-url", boost::program_options::value<std::string>(),
508  "Elastic Search database node url(http://localhost:9200/)")
509  ("elasticsearch-basic-auth", boost::program_options::value<std::string>(),
510  "Pass basic auth to elasticsearch database('')")
511  ("elasticsearch-bulk-replay", boost::program_options::value<uint32_t>(),
512  "Number of bulk documents to index on replay(10000)")
513  ("elasticsearch-bulk-sync", boost::program_options::value<uint32_t>(),
514  "Number of bulk documents to index on a syncronied chain(100)")
515  ("elasticsearch-index-prefix", boost::program_options::value<std::string>(),
516  "Add a prefix to the index(bitshares-)")
517  ("elasticsearch-max-mapping-depth", boost::program_options::value<uint16_t>(),
518  "The maximum index mapping depth (index.mapping.depth.limit) setting in ES, "
519  "should be >=2. (20)")
520  ("elasticsearch-start-es-after-block", boost::program_options::value<uint32_t>(),
521  "Start doing ES job after block(0)")
522  ("elasticsearch-visitor", boost::program_options::value<bool>(),
523  "Use visitor to index additional data(slows down the replay(false))")
524  ("elasticsearch-operation-object", boost::program_options::value<bool>(),
525  "Save operation as object(true)")
526  ("elasticsearch-operation-string", boost::program_options::value<bool>(),
527  "Save operation as string. Needed to serve history api calls(false)")
528  ("elasticsearch-mode", boost::program_options::value<uint16_t>(),
529  "Mode of operation: only_save(0), only_query(1), all(2) - Default: 0")
530  ;
531  cfg.add(cli);
532 }
533 
534 void detail::elasticsearch_plugin_impl::init_program_options(const boost::program_options::variables_map& options)
535 {
536  _options.init( options );
537 
538  if( _options.visitor )
539  bulk_line_struct.additional_data = visitor_struct();
540 
541  es = std::make_unique<graphene::utilities::es_client>( _options.elasticsearch_url, _options.auth );
542 
543  FC_ASSERT( es->check_status(), "ES database is not up in url ${url}", ("url", _options.elasticsearch_url) );
544 
545  es->check_version_7_or_above( is_es_version_7_or_above );
546 }
547 
548 void detail::elasticsearch_plugin_impl::plugin_options::init(const boost::program_options::variables_map& options)
549 {
550  utilities::get_program_option( options, "elasticsearch-node-url", elasticsearch_url );
551  utilities::get_program_option( options, "elasticsearch-basic-auth", auth );
552  utilities::get_program_option( options, "elasticsearch-bulk-replay", bulk_replay );
553  utilities::get_program_option( options, "elasticsearch-bulk-sync", bulk_sync );
554  utilities::get_program_option( options, "elasticsearch-index-prefix", index_prefix );
555  utilities::get_program_option( options, "elasticsearch-max-mapping-depth", max_mapping_depth );
556  utilities::get_program_option( options, "elasticsearch-start-es-after-block", start_es_after_block );
557  utilities::get_program_option( options, "elasticsearch-visitor", visitor );
558  utilities::get_program_option( options, "elasticsearch-operation-object", operation_object );
559  utilities::get_program_option( options, "elasticsearch-operation-string", operation_string );
560 
561  FC_ASSERT( max_mapping_depth >= 2, "The minimum value of elasticsearch-max-mapping-depth is 2" );
562 
563  auto es_mode = static_cast<uint16_t>( elasticsearch_mode );
564  utilities::get_program_option( options, "elasticsearch-mode", es_mode );
565  if( es_mode > static_cast<uint16_t>( mode::all ) )
566  FC_THROW_EXCEPTION( graphene::chain::plugin_exception, "Elasticsearch mode not valid" );
567  elasticsearch_mode = static_cast<mode>( es_mode );
568 
569  if( mode::all == elasticsearch_mode && !operation_string )
570  {
571  FC_THROW_EXCEPTION( graphene::chain::plugin_exception,
572  "If elasticsearch-mode is set to all then elasticsearch-operation-string need to be true");
573  }
574 }
575 
576 void elasticsearch_plugin::plugin_initialize(const boost::program_options::variables_map& options)
577 {
578  my->init_program_options( options );
579 
582 
583  if( my->_options.elasticsearch_mode != mode::only_query )
584  {
585  // connect with group 0 to process before some special steps (e.g. snapshot or next_object_id)
586  database().applied_block.connect( 0, [this](const signed_block &b) {
587  my->update_account_histories(b);
588  });
589  }
590 }
591 
593 {
594  // Nothing to do
595 }
596 
597 static operation_history_object fromEStoOperation(const variant& source)
598 {
600 
601  const auto operation_id = source["account_history"]["operation_id"];
602  fc::from_variant( operation_id, result.id, GRAPHENE_MAX_NESTED_OBJECTS );
603 
604  const auto op = fc::json::from_string(source["operation_history"]["op"].as_string());
606 
607  const auto operation_result = fc::json::from_string(source["operation_history"]["operation_result"].as_string());
609 
610  result.block_num = source["block_data"]["block_num"].as_uint64();
611  result.trx_in_block = source["operation_history"]["trx_in_block"].as_uint64();
612  result.op_in_trx = source["operation_history"]["op_in_trx"].as_uint64();
613  result.trx_in_block = source["operation_history"]["virtual_op"].as_uint64();
614 
615  return result;
616 }
617 
618 operation_history_object elasticsearch_plugin::get_operation_by_id( const operation_history_id_type& id ) const
619 {
620  const string operation_id_string = std::string(object_id_type(id));
621 
622  const string query = R"(
623  {
624  "query": {
625  "match":
626  {
627  "account_history.operation_id": ")" + operation_id_string + R"("
628  }
629  }
630  }
631  )";
632 
633  const auto response = my->es->query( my->_options.index_prefix + "*/_doc/_search", query );
634  variant variant_response = fc::json::from_string(response);
635  const auto source = variant_response["hits"]["hits"][size_t(0)]["_source"];
636  return fromEStoOperation(source);
637 }
638 
639 vector<operation_history_object> elasticsearch_plugin::get_account_history(
640  const account_id_type& account_id,
641  const operation_history_id_type& stop,
642  uint64_t limit,
643  const operation_history_id_type& start ) const
644 {
645  const auto account_id_string = std::string( account_id );
646 
647  const auto stop_number = stop.instance.value;
648  const auto start_number = start.instance.value;
649 
650  string range = "";
651  if(stop_number == 0)
652  range = " AND operation_id_num: ["+fc::to_string(stop_number)+" TO "+fc::to_string(start_number)+"]";
653  else if(stop_number > 0)
654  range = " AND operation_id_num: {"+fc::to_string(stop_number)+" TO "+fc::to_string(start_number)+"]";
655  // FIXME the code above is either redundant or buggy
656 
657  const string query = R"(
658  {
659  "size": )" + fc::to_string(limit) + R"(,
660  "sort" : [{ "operation_id_num" : {"order" : "desc"}}],
661  "query": {
662  "bool": {
663  "must": [
664  {
665  "query_string": {
666  "query": "account_history.account: )" + account_id_string + range + R"("
667  }
668  }
669  ]
670  }
671  }
672  }
673  )";
674 
675  vector<operation_history_object> result;
676 
677  if( !my->es->check_status() )
678  return result;
679 
680  const auto response = my->es->query( my->_options.index_prefix + "*/_doc/_search", query );
681 
682  variant variant_response = fc::json::from_string(response);
683 
684  const auto hits = variant_response["hits"]["total"];
685  size_t size;
686  if( hits.is_object() ) // ES-7 ?
687  size = hits["value"].as_uint64();
688  else // probably ES-6
689  size = hits.as_uint64();
690  size = std::min( size, size_t(limit) );
691 
692  const auto& data = variant_response["hits"]["hits"];
693  for( size_t i=0; i<size; ++i )
694  {
695  const auto& source = data[i]["_source"];
696  result.push_back(fromEStoOperation(source));
697  }
698  return result;
699 }
700 
702 {
703  return my->_options.elasticsearch_mode;
704 }
705 
706 } }
static string to_string(const variant &v, output_formatting format=stringify_large_ints_and_doubles, uint32_t max_depth=DEFAULT_MAX_RECURSION_DEPTH)
Definition: json.cpp:650
fc::time_point_sec timestamp
Definition: block.hpp:35
static fc::variant adapt_static_variant(const fc::variants &v, uint16_t max_depth)
void modify(const T &obj, const Lambda &m)
elasticsearch_plugin(graphene::app::application &app)
Wraps a derived index to intercept calls to create, modify, and remove so that callbacks may be fired...
Definition: index.hpp:312
void plugin_initialize(const boost::program_options::variables_map &options) override
Perform early startup routines and register plugin indexes, callbacks, etc.
std::string to_iso_string() const
Definition: time.cpp:24
const index_type & indices() const
fc::signal< void(const signed_block &)> applied_block
Definition: database.hpp:606
#define GRAPHENE_MAX_NESTED_OBJECTS
Definition: config.hpp:33
tracks the history of all logical operations on blockchain stateAll operations and virtual operations...
tracks the blockchain state in an extensible manner
Definition: database.hpp:70
const IndexType & get_index_type() const
Definition: api.cpp:48
#define elog(FORMAT,...)
Definition: logger.hpp:129
std::string plugin_name() const override
Get the name of the plugin.
uint64_t as_uint64() const
Definition: variant.cpp:398
void get_program_option(const boost::program_options::variables_map &from, const std::string &key, T &to)
std::vector< std::string > createBulk(const fc::mutable_variant_object &bulk_header, std::string &&data)
account_history_id_type next
the operation position within the given account
const vector< optional< operation_history_object > > & get_applied_operations() const
Definition: db_block.cpp:553
void operator()(const graphene::chain::fill_order_operation &o)
provides stack-based nullable value similar to boost::optional
Definition: optional.hpp:20
void plugin_set_program_options(boost::program_options::options_description &cli, boost::program_options::options_description &cfg) override
Fill in command line parameters used by the plugin.
uint32_t block_num() const
Definition: block.hpp:34
variant_object & get_object()
Definition: variant.cpp:554
static share_type scaled_precision(uint8_t precision)
Definition: asset.cpp:363
static constexpr size_t request_size_threshold
When doing bulk operations, call send_bulk when the approximate size of pending data reaches this val...
Transfers an amount of one asset from one account to another.
Definition: transfer.hpp:45
object_id_type id
Definition: object.hpp:69
void to_variant(const flat_set< T, A... > &var, variant &vo, uint32_t _max_depth)
Definition: flat.hpp:105
chain::database & database()
Definition: plugin.hpp:115
#define edump(SEQ)
Definition: logger.hpp:182
microseconds seconds(int64_t s)
Definition: time.hpp:34
a node in a linked list of operation_history_objectsAccount history is important for users and wallet...
#define ilog(FORMAT,...)
Definition: logger.hpp:117
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
Definition: exception.hpp:345
static fc::variant adapt(const fc::variant_object &op, uint16_t max_depth)
stores null, int64, uint64, double, bool, string, std::vector<variant>, and variant_object&#39;s.
Definition: variant.hpp:198
std::string plugin_description() const override
Get the description of the plugin.
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
Definition: exception.hpp:379
double to_real() const
Definition: asset.hpp:127
void plugin_startup() override
Begin normal runtime operations.
asset amount
The amount of asset to transfer from from to to.
Definition: transfer.hpp:58
void from_variant(const variant &var, flat_set< T, A... > &vo, uint32_t _max_depth)
Definition: flat.hpp:116
std::string to_string(double)
Definition: string.cpp:73
const account_statistics_object & get_account_stats_by_owner(account_id_type owner) const
Definition: db_getter.cpp:142
asset_id_type asset_id
Definition: asset.hpp:37
account_id_type from
Account to transfer asset from.
Definition: transfer.hpp:54
static variant from_string(const string &utf8_str, parse_type ptype=legacy_parser, uint32_t max_depth=DEFAULT_MAX_RECURSION_DEPTH)
Definition: json.cpp:458
void remove(const object &obj)
vector< operation_history_object > get_account_history(const account_id_type &account_id, const operation_history_id_type &stop=operation_history_id_type(), uint64_t limit=100, const operation_history_id_type &start=operation_history_id_type()) const
#define FC_PACK_MAX_DEPTH
Definition: config.hpp:3
static time_point now()
Definition: time.cpp:13
#define FC_CAPTURE_LOG_AND_RETHROW(...)
Definition: exception.hpp:415
vector< processed_transaction > transactions
Definition: block.hpp:68
operation_history_object get_operation_by_id(const operation_history_id_type &id) const
void operation_get_required_authorities(const operation &op, flat_set< account_id_type > &active, flat_set< account_id_type > &owner, vector< authority > &other, bool ignore_custom_operation_required_auths)
Definition: operations.cpp:103
void operation_get_impacted_accounts(const operation &op, flat_set< account_id_type > &result, bool ignore_custom_op_required_auths)
Definition: db_notify.cpp:391
account_id_type to
Account to transfer asset to.
Definition: transfer.hpp:56
const T & create(F &&constructor)
An order-perserving dictionary of variant&#39;s.
void operator()(const graphene::chain::transfer_operation &o)
T value
Definition: safe.hpp:22