28 #include <graphene/chain/hardfork.hpp> 30 #include <boost/algorithm/string.hpp> 51 std::string elasticsearch_url =
"http://localhost:9200/";
52 std::string auth =
"";
53 uint32_t bulk_replay = 10000;
54 uint32_t bulk_sync = 100;
56 std::string index_prefix =
"bitshares-";
59 uint16_t max_mapping_depth = 20;
61 uint32_t start_es_after_block = 0;
64 bool operation_object =
true;
65 bool operation_string =
false;
69 void init(
const boost::program_options::variables_map& options);
72 void update_account_histories(
const signed_block& b );
80 plugin_options _options;
82 primary_index< operation_history_index >* _oho_index;
84 uint32_t limit_documents = _options.bulk_replay;
86 std::unique_ptr<graphene::utilities::es_client> es;
88 vector <string> bulk_lines;
89 size_t approximate_bulk_size = 0;
93 std::string index_name;
95 bool is_es_version_7_or_above =
true;
97 void add_elasticsearch(
const account_id_type& account_id,
const optional<operation_history_object>& oho,
98 uint32_t block_number );
99 void send_bulk( uint32_t block_num );
102 void doBlock(uint32_t trx_in_block,
const signed_block& b,
block_struct& bs)
const;
103 void doVisitor(
const optional <operation_history_object>& oho,
visitor_struct& vs)
const;
107 void init_program_options(
const boost::program_options::variables_map& options);
111 const std::string& index_prefix )
114 std::vector<std::string> parts;
115 boost::split(parts, block_date_string, boost::is_any_of(
"-"));
116 std::string index_name = index_prefix + parts[0] +
"-" + parts[1];
120 void elasticsearch_plugin_impl::update_account_histories(
const signed_block& b )
122 checkState(b.timestamp);
123 index_name = generateIndexName(b.timestamp, _options.index_prefix);
127 bool is_first =
true;
128 auto skip_oho_id = [&is_first,&db,
this]() {
129 if( is_first && db._undo_db.enabled() )
135 _oho_index->use_next_id();
137 for(
const optional< operation_history_object >& o_op : hist ) {
138 optional <operation_history_object> oho;
140 auto create_oho = [&]() {
142 return optional<operation_history_object>(
147 h.result = o_op->result;
148 h.block_num = o_op->block_num;
149 h.trx_in_block = o_op->trx_in_block;
150 h.op_in_trx = o_op->op_in_trx;
151 h.virtual_op = o_op->virtual_op;
152 h.is_virtual = o_op->is_virtual;
153 h.block_time = o_op->block_time;
158 if( !o_op.valid() ) {
165 if( o_op->block_num > _options.start_es_after_block )
167 bulk_line_struct.operation_type = oho->op.which();
168 bulk_line_struct.operation_id_num = oho->id.instance();
169 doOperationHistory( oho, bulk_line_struct.operation_history );
170 doBlock( oho->trx_in_block, b, bulk_line_struct.block_data );
171 if( _options.visitor )
172 doVisitor( oho, *bulk_line_struct.additional_data );
178 flat_set<account_id_type> impacted;
179 vector<authority> other;
182 MUST_IGNORE_CUSTOM_OP_REQD_AUTHS( db.head_block_time() ) );
191 MUST_IGNORE_CUSTOM_OP_REQD_AUTHS( db.head_block_time() ) );
197 if( op_result.value.impacted_accounts.valid() )
199 for(
const auto& a : *op_result.value.impacted_accounts )
200 impacted.insert( a );
204 for(
const auto& a : other )
205 for(
const auto& item : a.account_auths )
206 impacted.insert( item.first );
208 for(
const auto& account_id : impacted )
211 add_elasticsearch( account_id, oho, b.block_num() );
217 if( is_sync && !bulk_lines.empty() )
218 send_bulk( b.block_num() );
222 void elasticsearch_plugin_impl::send_bulk( uint32_t block_num )
224 ilog(
"Sending ${n} lines of bulk data to ElasticSearch at block ${b}, approximate size ${s}",
225 (
"n",bulk_lines.size())(
"b",block_num)(
"s",approximate_bulk_size) );
226 if( !es->send_bulk( bulk_lines ) )
228 elog(
"Error sending ${n} lines of bulk data to ElasticSearch, the first lines are:",
229 (
"n",bulk_lines.size()) );
230 const auto log_max = std::min( bulk_lines.size(), size_t(10) );
231 for(
size_t i = 0; i < log_max; ++i )
233 edump( (bulk_lines[i]) );
236 "Error populating ES database, we are going to keep trying." );
239 approximate_bulk_size = 0;
240 bulk_lines.reserve(limit_documents);
247 limit_documents = _options.bulk_sync;
252 limit_documents = _options.bulk_replay;
255 bulk_lines.reserve(limit_documents);
262 template<
typename OpType>
265 return op.fee_payer();
277 if(_options.operation_string)
282 if(_options.operation_object) {
283 constexpr uint16_t current_depth = 2;
287 _options.max_mapping_depth - current_depth );
292 _options.max_mapping_depth - current_depth );
298 std::string trx_id =
"";
325 transfer_from = o.
from;
356 fee_asset = o.fee.asset_id;
357 fee_amount = o.fee.amount;
407 void elasticsearch_plugin_impl::add_elasticsearch(
const account_id_type& account_id,
409 uint32_t block_number )
417 obj.operation_id = oho->
id;
418 obj.account = account_id;
419 obj.
sequence = stats_obj.total_ops + 1;
420 obj.next = stats_obj.most_recent_op;
428 if( block_number > _options.start_es_after_block )
430 bulk_line_struct.account_history = ath;
435 bulk_header[
"_index"] = index_name;
436 if( !is_es_version_7_or_above )
437 bulk_header[
"_type"] =
"_doc";
438 bulk_header[
"_id"] = std::string( ath.id );
440 std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk_lines));
442 approximate_bulk_size += bulk_lines.back().size();
444 if( bulk_lines.size() >= limit_documents
446 send_bulk( block_number );
448 cleanObjects(ath, account_id);
452 const account_id_type& account_id )
457 const auto &by_seq_idx = his_idx.
indices().get<by_seq>();
458 auto itr = by_seq_idx.lower_bound(boost::make_tuple(account_id, 0));
459 if (itr != by_seq_idx.end() && itr->account == account_id && itr->id != ath.
id) {
461 const auto remove_op_id = itr->operation_id;
462 const auto itr_remove = itr;
467 if( itr != by_seq_idx.end() && itr->account == account_id )
470 obj.
next = account_history_id_type();
474 const auto &by_opid_idx = his_idx.indices().get<by_opid>();
475 if (by_opid_idx.find(remove_op_id) == by_opid_idx.end()) {
476 db.
remove(remove_op_id(db));
494 return "elasticsearch";
498 return "Stores account history data in elasticsearch database(EXPERIMENTAL).";
502 boost::program_options::options_description& cli,
503 boost::program_options::options_description& cfg
507 (
"elasticsearch-node-url", boost::program_options::value<std::string>(),
508 "Elastic Search database node url(http://localhost:9200/)")
509 (
"elasticsearch-basic-auth", boost::program_options::value<std::string>(),
510 "Pass basic auth to elasticsearch database('')")
511 (
"elasticsearch-bulk-replay", boost::program_options::value<uint32_t>(),
512 "Number of bulk documents to index on replay(10000)")
513 (
"elasticsearch-bulk-sync", boost::program_options::value<uint32_t>(),
514 "Number of bulk documents to index on a syncronied chain(100)")
515 (
"elasticsearch-index-prefix", boost::program_options::value<std::string>(),
516 "Add a prefix to the index(bitshares-)")
517 (
"elasticsearch-max-mapping-depth", boost::program_options::value<uint16_t>(),
518 "The maximum index mapping depth (index.mapping.depth.limit) setting in ES, " 519 "should be >=2. (20)")
520 (
"elasticsearch-start-es-after-block", boost::program_options::value<uint32_t>(),
521 "Start doing ES job after block(0)")
522 (
"elasticsearch-visitor", boost::program_options::value<bool>(),
523 "Use visitor to index additional data(slows down the replay(false))")
524 (
"elasticsearch-operation-object", boost::program_options::value<bool>(),
525 "Save operation as object(true)")
526 (
"elasticsearch-operation-string", boost::program_options::value<bool>(),
527 "Save operation as string. Needed to serve history api calls(false)")
528 (
"elasticsearch-mode", boost::program_options::value<uint16_t>(),
529 "Mode of operation: only_save(0), only_query(1), all(2) - Default: 0")
534 void detail::elasticsearch_plugin_impl::init_program_options(
const boost::program_options::variables_map& options)
536 _options.init( options );
538 if( _options.visitor )
541 es = std::make_unique<graphene::utilities::es_client>( _options.elasticsearch_url, _options.auth );
543 FC_ASSERT( es->check_status(),
"ES database is not up in url ${url}", (
"url", _options.elasticsearch_url) );
545 es->check_version_7_or_above( is_es_version_7_or_above );
548 void detail::elasticsearch_plugin_impl::plugin_options::init(
const boost::program_options::variables_map& options)
561 FC_ASSERT( max_mapping_depth >= 2,
"The minimum value of elasticsearch-max-mapping-depth is 2" );
563 auto es_mode =
static_cast<uint16_t
>( elasticsearch_mode );
565 if( es_mode > static_cast<uint16_t>(
mode::all ) )
566 FC_THROW_EXCEPTION( graphene::chain::plugin_exception,
"Elasticsearch mode not valid" );
567 elasticsearch_mode =
static_cast<mode>( es_mode );
569 if(
mode::all == elasticsearch_mode && !operation_string )
572 "If elasticsearch-mode is set to all then elasticsearch-operation-string need to be true");
578 my->init_program_options( options );
587 my->update_account_histories(b);
601 const auto operation_id = source[
"account_history"][
"operation_id"];
620 const string operation_id_string = std::string(
object_id_type(
id));
622 const string query = R
"( 627 "account_history.operation_id": ")" + operation_id_string + R"(" 633 const auto response = my->es->query( my->_options.index_prefix +
"*/_doc/_search", query );
635 const auto source = variant_response[
"hits"][
"hits"][size_t(0)][
"_source"];
636 return fromEStoOperation(source);
640 const account_id_type& account_id,
641 const operation_history_id_type& stop,
643 const operation_history_id_type& start )
const 645 const auto account_id_string = std::string( account_id );
647 const auto stop_number = stop.instance.value;
648 const auto start_number = start.instance.value;
653 else if(stop_number > 0)
657 const string query = R
"( 660 "sort" : [{ "operation_id_num" : {"order" : "desc"}}], 666 "query": "account_history.account: )" + account_id_string + range + R"(" 675 vector<operation_history_object> result; 677 if( !my->es->check_status() )
680 const auto response = my->es->query( my->_options.index_prefix +
"*/_doc/_search", query );
684 const auto hits = variant_response[
"hits"][
"total"];
686 if( hits.is_object() )
687 size = hits[
"value"].as_uint64();
690 size = std::min( size,
size_t(limit) );
692 const auto& data = variant_response[
"hits"][
"hits"];
693 for(
size_t i=0; i<size; ++i )
695 const auto& source = data[i][
"_source"];
696 result.push_back(fromEStoOperation(source));
703 return my->_options.elasticsearch_mode;
static string to_string(const variant &v, output_formatting format=stringify_large_ints_and_doubles, uint32_t max_depth=DEFAULT_MAX_RECURSION_DEPTH)
static fc::variant adapt_static_variant(const fc::variants &v, uint16_t max_depth)
account_id_type transfer_to
account_history_id_type most_recent_op
void modify(const T &obj, const Lambda &m)
elasticsearch_plugin(graphene::app::application &app)
~elasticsearch_plugin() override
Wraps a derived index to intercept calls to create, modify, and remove so that callbacks may be fired...
void plugin_initialize(const boost::program_options::variables_map &options) override
Perform early startup routines and register plugin indexes, callbacks, etc.
std::string to_iso_string() const
const index_type & indices() const
fc::signal< void(const signed_block &)> applied_block
asset_id_type receives_asset_id
#define GRAPHENE_MAX_NESTED_OBJECTS
tracks the history of all logical operations on blockchain stateAll operations and virtual operations...
tracks the blockchain state in an extensible manner
const IndexType & get_index_type() const
asset_id_type pays_asset_id
object_id_type fill_order_id
std::string plugin_name() const override
Get the name of the plugin.
std::string receives_asset_name
mode get_running_mode() const
uint64_t as_uint64() const
share_type transfer_amount
void get_program_option(const boost::program_options::variables_map &from, const std::string &key, T &to)
std::vector< std::string > createBulk(const fc::mutable_variant_object &bulk_header, std::string &&data)
account_id_type fee_payer
account_history_id_type next
the operation position within the given account
void operator()(const T &o)
share_type receives_amount
transfer_struct transfer_data
const vector< optional< operation_history_object > > & get_applied_operations() const
void operator()(const graphene::chain::fill_order_operation &o)
provides stack-based nullable value similar to boost::optional
void plugin_set_program_options(boost::program_options::options_description &cli, boost::program_options::options_description &cfg) override
Fill in command line parameters used by the plugin.
fc::time_point_sec block_time
variant_object & get_object()
std::string operation_result
static share_type scaled_precision(uint8_t precision)
static constexpr size_t request_size_threshold
When doing bulk operations, call send_bulk when the approximate size of pending data reaches this val...
Transfers an amount of one asset from one account to another.
void to_variant(const flat_set< T, A... > &var, variant &vo, uint32_t _max_depth)
chain::database & database()
microseconds seconds(int64_t s)
account_id_type account_id
a node in a linked list of operation_history_objectsAccount history is important for users and wallet...
asset_id_type fill_pays_asset_id
account_id_type result_type
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
static fc::variant adapt(const fc::variant_object &op, uint16_t max_depth)
stores null, int64, uint64, double, bool, string, std::vector<variant>, and variant_object's.
std::string plugin_description() const override
Get the description of the plugin.
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
asset_id_type transfer_asset_id
std::string pays_asset_name
void plugin_startup() override
Begin normal runtime operations.
account_id_type operator()(const OpType &op) const
variant operation_result_object
account_id_type account_id
share_type fill_pays_amount
asset amount
The amount of asset to transfer from from to to.
void from_variant(const variant &var, flat_set< T, A... > &vo, uint32_t _max_depth)
std::string to_string(double)
double receives_amount_units
elasticsearch_plugin_impl(elasticsearch_plugin &_plugin)
const account_statistics_object & get_account_stats_by_owner(account_id_type owner) const
account_id_type from
Account to transfer asset from.
account_id_type transfer_from
static variant from_string(const string &utf8_str, parse_type ptype=legacy_parser, uint32_t max_depth=DEFAULT_MAX_RECURSION_DEPTH)
void remove(const object &obj)
vector< operation_history_object > get_account_history(const account_id_type &account_id, const operation_history_id_type &stop=operation_history_id_type(), uint64_t limit=100, const operation_history_id_type &start=operation_history_id_type()) const
#define FC_PACK_MAX_DEPTH
#define FC_CAPTURE_LOG_AND_RETHROW(...)
vector< processed_transaction > transactions
operation_history_object get_operation_by_id(const operation_history_id_type &id) const
void operation_get_required_authorities(const operation &op, flat_set< account_id_type > &active, flat_set< account_id_type > &owner, vector< authority > &other, bool ignore_custom_operation_required_auths)
void operation_get_impacted_accounts(const operation &op, flat_set< account_id_type > &result, bool ignore_custom_op_required_auths)
account_id_type to
Account to transfer asset to.
const T & create(F &&constructor)
An order-perserving dictionary of variant's.
account_id_type fill_account_id
share_type fill_receives_amount
void operator()(const graphene::chain::transfer_operation &o)
asset_id_type fill_receives_asset_id