BitShares-Core  5.0.0
BitShares blockchain implementation and command-line interface software
elasticsearch_plugin.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017 Cryptonomex, Inc., and contributors.
3  *
4  * The MIT License
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  */
24 
28 #include <graphene/chain/hardfork.hpp>
29 #include <curl/curl.h>
30 
31 namespace graphene { namespace elasticsearch {
32 
33 namespace detail
34 {
35 
37 {
38  public:
40  : _self( _plugin )
41  { curl = curl_easy_init(); }
43 
44  bool update_account_histories( const signed_block& b );
45 
47  {
48  return _self.database();
49  }
50 
52  primary_index< operation_history_index >* _oho_index;
53 
54  std::string _elasticsearch_node_url = "http://localhost:9200/";
55  uint32_t _elasticsearch_bulk_replay = 10000;
56  uint32_t _elasticsearch_bulk_sync = 100;
57  bool _elasticsearch_visitor = false;
58  std::string _elasticsearch_basic_auth = "";
59  std::string _elasticsearch_index_prefix = "bitshares-";
64  CURL *curl; // curl handler
65  vector <string> bulk_lines; // vector of op lines
66  vector<std::string> prepare;
67 
69  uint32_t limit_documents;
70  int16_t op_type;
75  std::string bulk_line;
76  std::string index_name;
77  bool is_sync = false;
78  private:
79  bool add_elasticsearch( const account_id_type account_id, const optional<operation_history_object>& oho, const uint32_t block_number );
80  const account_transaction_history_object& addNewEntry(const account_statistics_object& stats_obj,
81  const account_id_type& account_id,
82  const optional <operation_history_object>& oho);
83  const account_statistics_object& getStatsObject(const account_id_type& account_id);
84  void growStats(const account_statistics_object& stats_obj, const account_transaction_history_object& ath);
85  void getOperationType(const optional <operation_history_object>& oho);
86  void doOperationHistory(const optional <operation_history_object>& oho);
87  void doBlock(uint32_t trx_in_block, const signed_block& b);
88  void doVisitor(const optional <operation_history_object>& oho);
89  void checkState(const fc::time_point_sec& block_time);
90  void cleanObjects(const account_transaction_history_id_type& ath, const account_id_type& account_id);
91  void createBulkLine(const account_transaction_history_object& ath);
92  void prepareBulk(const account_transaction_history_id_type& ath_id);
93  void populateESstruct();
94 };
95 
97 {
98  if (curl) {
99  curl_easy_cleanup(curl);
100  curl = nullptr;
101  }
102  return;
103 }
104 
106 {
107  checkState(b.timestamp);
109 
111  const vector<optional< operation_history_object > >& hist = db.get_applied_operations();
112  bool is_first = true;
113  auto skip_oho_id = [&is_first,&db,this]() {
114  if( is_first && db._undo_db.enabled() ) // this ensures that the current id is rolled back on undo
115  {
116  db.remove( db.create<operation_history_object>( []( operation_history_object& obj) {} ) );
117  is_first = false;
118  }
119  else
120  _oho_index->use_next_id();
121  };
122  for( const optional< operation_history_object >& o_op : hist ) {
123  optional <operation_history_object> oho;
124 
125  auto create_oho = [&]() {
126  is_first = false;
127  return optional<operation_history_object>(
129  if (o_op.valid())
130  {
131  h.op = o_op->op;
132  h.result = o_op->result;
133  h.block_num = o_op->block_num;
134  h.trx_in_block = o_op->trx_in_block;
135  h.op_in_trx = o_op->op_in_trx;
136  h.virtual_op = o_op->virtual_op;
137  }
138  }));
139  };
140 
141  if( !o_op.valid() ) {
142  skip_oho_id();
143  continue;
144  }
145  oho = create_oho();
146 
147  // populate what we can before impacted loop
148  getOperationType(oho);
149  doOperationHistory(oho);
150  doBlock(oho->trx_in_block, b);
152  doVisitor(oho);
153 
154  const operation_history_object& op = *o_op;
155 
156  // get the set of accounts this operation applies to
157  flat_set<account_id_type> impacted;
158  vector<authority> other;
159  // fee_payer is added here
160  operation_get_required_authorities( op.op, impacted, impacted, other,
161  MUST_IGNORE_CUSTOM_OP_REQD_AUTHS( db.head_block_time() ) );
162 
163  if( op.op.is_type< account_create_operation >() )
164  impacted.insert( op.result.get<object_id_type>() );
165  else
166  operation_get_impacted_accounts( op.op, impacted,
167  MUST_IGNORE_CUSTOM_OP_REQD_AUTHS( db.head_block_time() ) );
168 
169  for( auto& a : other )
170  for( auto& item : a.account_auths )
171  impacted.insert( item.first );
172 
173  for( auto& account_id : impacted )
174  {
175  if(!add_elasticsearch( account_id, oho, b.block_num() ))
176  {
177  elog( "Error adding data to Elastic Search: block num ${b}, account ${a}, data ${d}",
178  ("b",b.block_num()) ("a",account_id) ("d", oho) );
179  return false;
180  }
181  }
182  }
183  // we send bulk at end of block when we are in sync for better real time client experience
184  if(is_sync)
185  {
186  populateESstruct();
187  if(es.bulk_lines.size() > 0)
188  {
189  prepare.clear();
190  if(!graphene::utilities::SendBulk(std::move(es)))
191  {
192  // Note: although called with `std::move()`, `es` is not updated in `SendBulk()`
193  elog( "Error sending ${n} lines of bulk data to Elastic Search, the first lines are:",
194  ("n",es.bulk_lines.size()) );
195  for( size_t i = 0; i < es.bulk_lines.size() && i < 10; ++i )
196  {
197  edump( (es.bulk_lines[i]) );
198  }
199  return false;
200  }
201  else
202  bulk_lines.clear();
203  }
204  }
205 
206  if(bulk_lines.size() != limit_documents)
207  bulk_lines.reserve(limit_documents);
208 
209  return true;
210 }
211 
212 void elasticsearch_plugin_impl::checkState(const fc::time_point_sec& block_time)
213 {
214  if((fc::time_point::now() - block_time) < fc::seconds(30))
215  {
217  is_sync = true;
218  }
219  else
220  {
222  is_sync = false;
223  }
224 }
225 
226 void elasticsearch_plugin_impl::getOperationType(const optional <operation_history_object>& oho)
227 {
228  if (!oho->id.is_null())
229  op_type = oho->op.which();
230 }
231 
232 void elasticsearch_plugin_impl::doOperationHistory(const optional <operation_history_object>& oho)
233 {
234  os.trx_in_block = oho->trx_in_block;
235  os.op_in_trx = oho->op_in_trx;
236  os.operation_result = fc::json::to_string(oho->result);
237  os.virtual_op = oho->virtual_op;
238 
241  adaptor_struct adaptor;
242  os.op_object = adaptor.adapt(os.op_object.get_object());
243  }
245  os.op = fc::json::to_string(oho->op);
246 }
247 
248 void elasticsearch_plugin_impl::doBlock(uint32_t trx_in_block, const signed_block& b)
249 {
250  std::string trx_id = "";
251  if(trx_in_block < b.transactions.size())
252  trx_id = b.transactions[trx_in_block].id().str();
253  bs.block_num = b.block_num();
254  bs.block_time = b.timestamp;
255  bs.trx_id = trx_id;
256 }
257 
258 void elasticsearch_plugin_impl::doVisitor(const optional <operation_history_object>& oho)
259 {
261 
262  operation_visitor o_v;
263  oho->op.visit(o_v);
264 
265  auto fee_asset = o_v.fee_asset(db);
266  vs.fee_data.asset = o_v.fee_asset;
267  vs.fee_data.asset_name = fee_asset.symbol;
269  vs.fee_data.amount_units = (o_v.fee_amount.value)/(double)asset::scaled_precision(fee_asset.precision).value;
270 
271  auto transfer_asset = o_v.transfer_asset_id(db);
273  vs.transfer_data.asset_name = transfer_asset.symbol;
275  vs.transfer_data.amount_units = (o_v.transfer_amount.value)/(double)asset::scaled_precision(transfer_asset.precision).value;
278 
279  auto fill_pays_asset = o_v.fill_pays_asset_id(db);
280  auto fill_receives_asset = o_v.fill_receives_asset_id(db);
284  vs.fill_data.pays_asset_name = fill_pays_asset.symbol;
286  vs.fill_data.pays_amount_units = (o_v.fill_pays_amount.value)/(double)asset::scaled_precision(fill_pays_asset.precision).value;
288  vs.fill_data.receives_asset_name = fill_receives_asset.symbol;
290  vs.fill_data.receives_amount_units = (o_v.fill_receives_amount.value)/(double)asset::scaled_precision(fill_receives_asset.precision).value;
291 
292  auto fill_price = (o_v.fill_receives_amount.value/(double)asset::scaled_precision(fill_receives_asset.precision).value) /
293  (o_v.fill_pays_amount.value/(double)asset::scaled_precision(fill_pays_asset.precision).value);
294  vs.fill_data.fill_price_units = fill_price;
297 }
298 
299 bool elasticsearch_plugin_impl::add_elasticsearch( const account_id_type account_id,
300  const optional <operation_history_object>& oho,
301  const uint32_t block_number)
302 {
303  const auto &stats_obj = getStatsObject(account_id);
304  const auto &ath = addNewEntry(stats_obj, account_id, oho);
305  growStats(stats_obj, ath);
306  if(block_number > _elasticsearch_start_es_after_block) {
307  createBulkLine(ath);
308  prepareBulk(ath.id);
309  }
310  cleanObjects(ath.id, account_id);
311 
312  if (curl && bulk_lines.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech
313  prepare.clear();
314  populateESstruct();
315  if(!graphene::utilities::SendBulk(std::move(es)))
316  {
317  // Note: although called with `std::move()`, `es` is not updated in `SendBulk()`
318  elog( "Error sending ${n} lines of bulk data to Elastic Search, the first lines are:",
319  ("n",es.bulk_lines.size()) );
320  for( size_t i = 0; i < es.bulk_lines.size() && i < 10; ++i )
321  {
322  edump( (es.bulk_lines[i]) );
323  }
324  return false;
325  }
326  else
327  bulk_lines.clear();
328  }
329 
330  return true;
331 }
332 
333 const account_statistics_object& elasticsearch_plugin_impl::getStatsObject(const account_id_type& account_id)
334 {
336  const auto &stats_obj = db.get_account_stats_by_owner(account_id);
337 
338  return stats_obj;
339 }
340 
341 const account_transaction_history_object& elasticsearch_plugin_impl::addNewEntry(const account_statistics_object& stats_obj,
342  const account_id_type& account_id,
343  const optional <operation_history_object>& oho)
344 {
347  obj.operation_id = oho->id;
348  obj.account = account_id;
349  obj.sequence = stats_obj.total_ops + 1;
350  obj.next = stats_obj.most_recent_op;
351  });
352 
353  return ath;
354 }
355 
356 void elasticsearch_plugin_impl::growStats(const account_statistics_object& stats_obj,
358 {
360  db.modify(stats_obj, [&](account_statistics_object &obj) {
361  obj.most_recent_op = ath.id;
362  obj.total_ops = ath.sequence;
363  });
364 }
365 
366 void elasticsearch_plugin_impl::createBulkLine(const account_transaction_history_object& ath)
367 {
371  bulk_line_struct.operation_id_num = ath.operation_id.instance.value;
376 }
377 
378 void elasticsearch_plugin_impl::prepareBulk(const account_transaction_history_id_type& ath_id)
379 {
380  const std::string _id = fc::json::to_string(ath_id);
381  fc::mutable_variant_object bulk_header;
382  bulk_header["_index"] = index_name;
383  bulk_header["_type"] = "data";
384  bulk_header["_id"] = fc::to_string(ath_id.space_id) + "." + fc::to_string(ath_id.type_id) + "."
385  + fc::to_string(ath_id.instance.value);
386  prepare = graphene::utilities::createBulk(bulk_header, std::move(bulk_line));
387  std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk_lines));
388  prepare.clear();
389 }
390 
391 void elasticsearch_plugin_impl::cleanObjects(const account_transaction_history_id_type& ath_id, const account_id_type& account_id)
392 {
394  // remove everything except current object from ath
395  const auto &his_idx = db.get_index_type<account_transaction_history_index>();
396  const auto &by_seq_idx = his_idx.indices().get<by_seq>();
397  auto itr = by_seq_idx.lower_bound(boost::make_tuple(account_id, 0));
398  if (itr != by_seq_idx.end() && itr->account == account_id && itr->id != ath_id) {
399  // if found, remove the entry
400  const auto remove_op_id = itr->operation_id;
401  const auto itr_remove = itr;
402  ++itr;
403  db.remove( *itr_remove );
404  // modify previous node's next pointer
405  // this should be always true, but just have a check here
406  if( itr != by_seq_idx.end() && itr->account == account_id )
407  {
408  db.modify( *itr, [&]( account_transaction_history_object& obj ){
409  obj.next = account_transaction_history_id_type();
410  });
411  }
412  // do the same on oho
413  const auto &by_opid_idx = his_idx.indices().get<by_opid>();
414  if (by_opid_idx.find(remove_op_id) == by_opid_idx.end()) {
415  db.remove(remove_op_id(db));
416  }
417  }
418 }
419 
420 void elasticsearch_plugin_impl::populateESstruct()
421 {
422  es.curl = curl;
423  es.bulk_lines = std::move(bulk_lines);
427  es.endpoint = "";
428  es.query = "";
429 }
430 
431 } // end namespace detail
432 
434  my( new detail::elasticsearch_plugin_impl(*this) )
435 {
436 }
437 
439 {
440 }
441 
443 {
444  return "elasticsearch";
445 }
447 {
448  return "Stores account history data in elasticsearch database(EXPERIMENTAL).";
449 }
450 
452  boost::program_options::options_description& cli,
453  boost::program_options::options_description& cfg
454  )
455 {
456  cli.add_options()
457  ("elasticsearch-node-url", boost::program_options::value<std::string>(),
458  "Elastic Search database node url(http://localhost:9200/)")
459  ("elasticsearch-bulk-replay", boost::program_options::value<uint32_t>(),
460  "Number of bulk documents to index on replay(10000)")
461  ("elasticsearch-bulk-sync", boost::program_options::value<uint32_t>(),
462  "Number of bulk documents to index on a syncronied chain(100)")
463  ("elasticsearch-visitor", boost::program_options::value<bool>(),
464  "Use visitor to index additional data(slows down the replay(false))")
465  ("elasticsearch-basic-auth", boost::program_options::value<std::string>(),
466  "Pass basic auth to elasticsearch database('')")
467  ("elasticsearch-index-prefix", boost::program_options::value<std::string>(),
468  "Add a prefix to the index(bitshares-)")
469  ("elasticsearch-operation-object", boost::program_options::value<bool>(),
470  "Save operation as object(true)")
471  ("elasticsearch-start-es-after-block", boost::program_options::value<uint32_t>(),
472  "Start doing ES job after block(0)")
473  ("elasticsearch-operation-string", boost::program_options::value<bool>(),
474  "Save operation as string. Needed to serve history api calls(false)")
475  ("elasticsearch-mode", boost::program_options::value<uint16_t>(),
476  "Mode of operation: only_save(0), only_query(1), all(2) - Default: 0")
477  ;
478  cfg.add(cli);
479 }
480 
481 void elasticsearch_plugin::plugin_initialize(const boost::program_options::variables_map& options)
482 {
485 
486  if (options.count("elasticsearch-node-url")) {
487  my->_elasticsearch_node_url = options["elasticsearch-node-url"].as<std::string>();
488  }
489  if (options.count("elasticsearch-bulk-replay")) {
490  my->_elasticsearch_bulk_replay = options["elasticsearch-bulk-replay"].as<uint32_t>();
491  }
492  if (options.count("elasticsearch-bulk-sync")) {
493  my->_elasticsearch_bulk_sync = options["elasticsearch-bulk-sync"].as<uint32_t>();
494  }
495  if (options.count("elasticsearch-visitor")) {
496  my->_elasticsearch_visitor = options["elasticsearch-visitor"].as<bool>();
497  }
498  if (options.count("elasticsearch-basic-auth")) {
499  my->_elasticsearch_basic_auth = options["elasticsearch-basic-auth"].as<std::string>();
500  }
501  if (options.count("elasticsearch-index-prefix")) {
502  my->_elasticsearch_index_prefix = options["elasticsearch-index-prefix"].as<std::string>();
503  }
504  if (options.count("elasticsearch-operation-object")) {
505  my->_elasticsearch_operation_object = options["elasticsearch-operation-object"].as<bool>();
506  }
507  if (options.count("elasticsearch-start-es-after-block")) {
508  my->_elasticsearch_start_es_after_block = options["elasticsearch-start-es-after-block"].as<uint32_t>();
509  }
510  if (options.count("elasticsearch-operation-string")) {
511  my->_elasticsearch_operation_string = options["elasticsearch-operation-string"].as<bool>();
512  }
513  if (options.count("elasticsearch-mode")) {
514  const auto option_number = options["elasticsearch-mode"].as<uint16_t>();
515  if(option_number > mode::all)
516  FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Elasticsearch mode not valid");
517  my->_elasticsearch_mode = static_cast<mode>(options["elasticsearch-mode"].as<uint16_t>());
518  }
519 
520  if(my->_elasticsearch_mode != mode::only_query) {
521  if (my->_elasticsearch_mode == mode::all && !my->_elasticsearch_operation_string)
522  FC_THROW_EXCEPTION(graphene::chain::plugin_exception,
523  "If elasticsearch-mode is set to all then elasticsearch-operation-string need to be true");
524 
525  database().applied_block.connect([this](const signed_block &b) {
526  if (!my->update_account_histories(b))
527  FC_THROW_EXCEPTION(graphene::chain::plugin_exception,
528  "Error populating ES database, we are going to keep trying.");
529  });
530  }
531 }
532 
534 {
536  es.curl = my->curl;
537  es.elasticsearch_url = my->_elasticsearch_node_url;
538  es.auth = my->_elasticsearch_basic_auth;
539 
541  FC_THROW_EXCEPTION(fc::exception, "ES database is not up in url ${url}", ("url", my->_elasticsearch_node_url));
542  ilog("elasticsearch ACCOUNT HISTORY: plugin_startup() begin");
543 }
544 
546 {
547  const string operation_id_string = std::string(object_id_type(id));
548 
549  const string query = R"(
550  {
551  "query": {
552  "match":
553  {
554  "account_history.operation_id": )" + operation_id_string + R"("
555  }
556  }
557  }
558  )";
559 
560  auto es = prepareHistoryQuery(query);
563  const auto source = variant_response["hits"]["hits"][size_t(0)]["_source"];
564  return fromEStoOperation(source);
565 }
566 
567 vector<operation_history_object> elasticsearch_plugin::get_account_history(
568  const account_id_type account_id,
569  operation_history_id_type stop = operation_history_id_type(),
570  unsigned limit = 100,
571  operation_history_id_type start = operation_history_id_type())
572 {
573  const string account_id_string = std::string(object_id_type(account_id));
574 
575  const auto stop_number = stop.instance.value;
576  const auto start_number = start.instance.value;
577 
578  string range = "";
579  if(stop_number == 0)
580  range = " AND operation_id_num: ["+fc::to_string(stop_number)+" TO "+fc::to_string(start_number)+"]";
581  else if(stop_number > 0)
582  range = " AND operation_id_num: {"+fc::to_string(stop_number)+" TO "+fc::to_string(start_number)+"]";
583 
584  const string query = R"(
585  {
586  "size": )" + fc::to_string(limit) + R"(,
587  "sort" : [{ "operation_id_num" : {"order" : "desc"}}],
588  "query": {
589  "bool": {
590  "must": [
591  {
592  "query_string": {
593  "query": "account_history.account: )" + account_id_string + range + R"("
594  }
595  }
596  ]
597  }
598  }
599  }
600  )";
601 
602  auto es = prepareHistoryQuery(query);
603 
604  vector<operation_history_object> result;
605 
607  return result;
608 
611 
612  const auto hits = variant_response["hits"]["total"];
613  uint32_t size;
614  if( hits.is_object() ) // ES-7 ?
615  size = static_cast<uint32_t>(hits["value"].as_uint64());
616  else // probably ES-6
617  size = static_cast<uint32_t>(hits.as_uint64());
618  size = std::min( size, limit );
619 
620  for(unsigned i=0; i<size; i++)
621  {
622  const auto source = variant_response["hits"]["hits"][size_t(i)]["_source"];
623  result.push_back(fromEStoOperation(source));
624  }
625  return result;
626 }
627 
628 operation_history_object elasticsearch_plugin::fromEStoOperation(variant source)
629 {
631 
632  const auto operation_id = source["account_history"]["operation_id"];
633  fc::from_variant( operation_id, result.id, GRAPHENE_MAX_NESTED_OBJECTS );
634 
635  const auto op = fc::json::from_string(source["operation_history"]["op"].as_string());
637 
638  const auto operation_result = fc::json::from_string(source["operation_history"]["operation_result"].as_string());
640 
641  result.block_num = source["block_data"]["block_num"].as_uint64();
642  result.trx_in_block = source["operation_history"]["trx_in_block"].as_uint64();
643  result.op_in_trx = source["operation_history"]["op_in_trx"].as_uint64();
644  result.trx_in_block = source["operation_history"]["virtual_op"].as_uint64();
645 
646  return result;
647 }
648 
649 graphene::utilities::ES elasticsearch_plugin::prepareHistoryQuery(string query)
650 {
651  CURL *curl;
652  curl = curl_easy_init();
653 
655  es.curl = curl;
656  es.elasticsearch_url = my->_elasticsearch_node_url;
657  es.index_prefix = my->_elasticsearch_index_prefix;
658  es.endpoint = es.index_prefix + "*/data/_search";
659  es.query = query;
660 
661  return es;
662 }
663 
665 {
666  return my->_elasticsearch_mode;
667 }
668 
669 
670 } }
bool is_type() const
static string to_string(const variant &v, output_formatting format=stringify_large_ints_and_doubles, uint32_t max_depth=DEFAULT_MAX_RECURSION_DEPTH)
Definition: json.cpp:638
vector< operation_history_object > get_account_history(const account_id_type account_id, operation_history_id_type stop, unsigned limit, operation_history_id_type start)
void modify(const T &obj, const Lambda &m)
const auto response
Wraps a derived index to intercept calls to create, modify, and remove so that callbacks may be fired...
Definition: index.hpp:309
std::unique_ptr< detail::elasticsearch_plugin_impl > my
virtual void plugin_initialize(const boost::program_options::variables_map &options) override
Perform early startup routines and register plugin indexes, callbacks, etc.
fc::signal< void(const signed_block &)> applied_block
Definition: database.hpp:197
#define GRAPHENE_MAX_NESTED_OBJECTS
Definition: config.hpp:31
tracks the history of all logical operations on blockchain stateAll operations and virtual operations...
operation_history_object get_operation_by_id(operation_history_id_type id)
tracks the blockchain state in an extensible manner
Definition: database.hpp:70
Definition: api.cpp:56
#define elog(FORMAT,...)
Definition: logger.hpp:129
optional< visitor_struct > additional_data
a node in a linked list of operation_history_objectsAccount history is important for users and wallet...
Used to generate a useful error report when an exception is thrown.At each level in the stack where t...
Definition: exception.hpp:56
primary_index< operation_history_index > * _oho_index
const auto source
virtual void plugin_set_program_options(boost::program_options::options_description &cli, boost::program_options::options_description &cfg) override
Fill in command line parameters used by the plugin.
bool SendBulk(ES &&es)
variant_object & get_object()
Definition: variant.cpp:554
const vector< optional< operation_history_object > > & get_applied_operations() const
Definition: db_block.cpp:548
object_id_type id
Definition: object.hpp:69
chain::database & database()
Definition: plugin.hpp:114
const std::vector< std::string > createBulk(const fc::mutable_variant_object &bulk_header, std::string &&data)
account_transaction_history_object account_history
#define edump(SEQ)
Definition: logger.hpp:182
microseconds seconds(int64_t s)
Definition: time.hpp:34
const std::string simpleQuery(ES &es)
#define ilog(FORMAT,...)
Definition: logger.hpp:117
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
Definition: exception.hpp:378
bool checkES(ES &es)
virtual void plugin_startup() override
Begin normal runtime operations.
const account_statistics_object & get_account_stats_by_owner(account_id_type owner) const
Definition: db_getter.cpp:139
void from_variant(const variant &var, flat_set< T, A... > &vo, uint32_t _max_depth)
Definition: flat.hpp:116
std::string to_string(double)
Definition: string.cpp:73
variant variant_response
account_transaction_history_id_type next
the operation position within the given account
static variant from_string(const string &utf8_str, parse_type ptype=legacy_parser, uint32_t max_depth=DEFAULT_MAX_RECURSION_DEPTH)
Definition: json.cpp:458
void remove(const object &obj)
const std::string generateIndexName(const fc::time_point_sec &block_date, const std::string &_elasticsearch_index_prefix)
#define FC_PACK_MAX_DEPTH
Definition: config.hpp:3
static time_point now()
Definition: time.cpp:13
static share_type scaled_precision(uint8_t precision)
Definition: asset.hpp:93
std::vector< std::string > bulk_lines
void operation_get_required_authorities(const operation &op, flat_set< account_id_type > &active, flat_set< account_id_type > &owner, vector< authority > &other, bool ignore_custom_operation_required_auths)
Definition: operations.cpp:103
const T & create(F &&constructor)
An order-perserving dictionary of variant&#39;s.
void operation_get_impacted_accounts(const operation &op, flat_set< account_id_type > &result, bool ignore_custom_operation_required_auths)
Definition: db_notify.cpp:342
account_transaction_history_id_type most_recent_op
const IndexType & get_index_type() const
operation_history_id_type operation_id
the account this operation applies to
T value
Definition: safe.hpp:22
const index_type & indices() const
variant adapt(const variant_object &op)