38 template<u
int8_t SpaceID, u
int8_t TypeID>
63 : enabled(e), store_updates(su), no_delete(nd), index_name(in)
67 bool store_updates =
false;
68 bool no_delete =
false;
69 string index_name =
"";
71 std::string elasticsearch_url =
"http://localhost:9200/";
72 std::string auth =
"";
73 uint32_t bulk_replay = 10000;
74 uint32_t bulk_sync = 100;
84 std::string index_prefix =
"objects-";
88 uint16_t max_mapping_depth = 10;
90 uint32_t start_es_after_block = 0;
91 bool sync_db_on_startup =
false;
93 void init(
const boost::program_options::variables_map& options);
96 enum class action_type
103 void on_objects_create(
const vector<object_id_type>& ids)
104 { index_database( ids, action_type::insertion ); }
106 void on_objects_update(
const vector<object_id_type>& ids)
107 { index_database( ids, action_type::update ); }
109 void on_objects_delete(
const vector<object_id_type>& ids)
110 { index_database( ids, action_type::deletion ); }
112 void index_database(
const vector<object_id_type>& ids, action_type action);
114 void sync_db(
bool delete_before_load =
false );
121 plugin_options _options;
123 uint32_t limit_documents = _options.bulk_replay;
125 uint64_t docs_sent_batch = 0;
126 uint64_t docs_sent_total = 0;
128 std::unique_ptr<graphene::utilities::es_client> es;
130 vector<std::string> bulk_lines;
131 size_t approximate_bulk_size = 0;
133 uint32_t block_number = 0;
135 bool is_es_version_7_or_above =
true;
140 void init_program_options(
const boost::program_options::variables_map& options);
142 void send_bulk_if_ready(
bool force =
false );
151 : my(_my), db( my->_self.
database() )
155 template<
typename ObjType>
157 bool force_delete =
false )
165 ilog(
"Deleting all data in index " + my->_options.index_prefix + opt.
index_name );
166 my->delete_all_from_database( opt );
169 ilog(
"Loading data into index " + my->_options.index_prefix + opt.
index_name );
172 my->prepareTemplate( static_cast<const ObjType&>(o), opt );
174 my->send_bulk_if_ready(
true);
175 my->docs_sent_batch = 0;
179 void es_objects_plugin_impl::sync_db(
bool delete_before_load )
181 ilog(
"elasticsearch OBJECTS: loading data from the object database (chain state)");
198 ilog(
"elasticsearch OBJECTS: done loading data from the object database (chain state)");
201 void es_objects_plugin_impl::index_database(
const vector<object_id_type>& ids, action_type action)
207 if( block_number <= _options.start_es_after_block )
214 limit_documents = _options.bulk_sync;
216 limit_documents = _options.bulk_replay;
218 bulk_lines.reserve(limit_documents);
220 static const unordered_map<uint16_t,plugin_options::object_options&> data_type_map = {
221 { account_id_type::space_type, _options.accounts },
222 { account_balance_id_type::space_type, _options.balances },
223 { asset_id_type::space_type, _options.assets },
224 { asset_bitasset_data_id_type::space_type, _options.asset_bitasset },
225 { limit_order_id_type::space_type, _options.limit_orders },
226 { proposal_id_type::space_type, _options.proposals },
227 { budget_record_id_type::space_type, _options.budget }
230 for(
const auto& value: ids )
232 const auto itr = data_type_map.find( value.space_type() );
233 if( itr == data_type_map.end() || !(itr->second.enabled) )
235 const auto& opt = itr->second;
236 if( action_type::deletion == action )
237 delete_from_database( value, opt );
242 case account_id_type::space_type:
245 case account_balance_id_type::space_type:
248 case asset_id_type::space_type:
251 case asset_bitasset_data_id_type::space_type:
254 case limit_order_id_type::space_type:
257 case proposal_id_type::space_type:
260 case budget_record_id_type::space_type:
271 void es_objects_plugin_impl::delete_from_database(
278 delete_line[
"_id"] = string(
id);
279 delete_line[
"_index"] = _options.index_prefix + opt.
index_name;
280 if( !is_es_version_7_or_above )
281 delete_line[
"_type"] =
"_doc";
283 final_delete_line[
"delete"] = std::move( delete_line );
287 approximate_bulk_size += bulk_lines.back().size();
289 send_bulk_if_ready();
292 void es_objects_plugin_impl::delete_all_from_database(
const plugin_options::object_options& opt )
const 300 es->query( _options.index_prefix + opt.index_name +
"/_delete_by_query", R
"({"query":{"match_all":{}}})" ); 304 void es_objects_plugin_impl::prepareTemplate(
308 bulk_header[
"_index"] = _options.index_prefix + opt.
index_name;
309 if( !is_es_version_7_or_above )
310 bulk_header[
"_type"] =
"_doc";
313 bulk_header[
"_id"] = string(blockchain_object.id);
319 _options.max_mapping_depth ) );
321 o[
"object_id"] = string(blockchain_object.id);
322 o[
"block_time"] = block_time;
323 o[
"block_number"] = block_number;
328 std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk_lines));
330 approximate_bulk_size += bulk_lines.back().size();
332 send_bulk_if_ready();
335 void es_objects_plugin_impl::send_bulk_if_ready(
bool force )
337 if( bulk_lines.empty() )
339 if( !force && bulk_lines.size() < limit_documents
342 constexpr uint32_t log_count_threshold = 20000;
343 constexpr uint32_t log_time_threshold = 3600;
344 static uint64_t next_log_count = log_count_threshold;
346 docs_sent_batch += bulk_lines.size();
347 docs_sent_total += bulk_lines.size();
348 bool log_by_next = ( docs_sent_total >= next_log_count ||
fc::time_point::now() >= next_log_time );
349 if( log_by_next || limit_documents == _options.bulk_replay || force )
351 ilog(
"Sending ${n} lines of bulk data to ElasticSearch at block ${blk}, " 352 "this batch ${b}, total ${t}, approximate size ${s}",
353 (
"n",bulk_lines.size())(
"blk",block_number)
354 (
"b",docs_sent_batch)(
"t",docs_sent_total)(
"s",approximate_bulk_size) );
355 next_log_count = docs_sent_total + log_count_threshold;
359 if( !es->send_bulk( bulk_lines ) )
361 elog(
"Error sending ${n} lines of bulk data to ElasticSearch, the first lines are:",
362 (
"n",bulk_lines.size()) );
363 const auto log_max = std::min( bulk_lines.size(), size_t(10) );
364 for(
size_t i = 0; i < log_max; ++i )
366 edump( (bulk_lines[i]) );
369 "Error populating ES database, we are going to keep trying." );
372 bulk_lines.reserve(limit_documents);
373 approximate_bulk_size = 0;
380 my(
std::make_unique<detail::es_objects_plugin_impl>(*this) )
393 return "Stores blockchain objects in ES database. Experimental.";
397 boost::program_options::options_description& cli,
398 boost::program_options::options_description& cfg
402 (
"es-objects-elasticsearch-url", boost::program_options::value<std::string>(),
403 "Elasticsearch node url(http://localhost:9200/)")
404 (
"es-objects-auth", boost::program_options::value<std::string>(),
"Basic auth username:password('')")
405 (
"es-objects-bulk-replay", boost::program_options::value<uint32_t>(),
406 "Number of bulk documents to index on replay(10000)")
407 (
"es-objects-bulk-sync", boost::program_options::value<uint32_t>(),
408 "Number of bulk documents to index on a synchronized chain(100)")
410 (
"es-objects-proposals", boost::program_options::value<bool>(),
"Store proposal objects (true)")
411 (
"es-objects-proposals-store-updates", boost::program_options::value<bool>(),
412 "Store all updates to the proposal objects (false)")
413 (
"es-objects-proposals-no-delete", boost::program_options::value<bool>(),
414 "Do not delete a proposal from ES even if it is deleted from chain state. " 415 "It is implicitly true and can not be set to false if es-objects-proposals-store-updates is true. " 418 (
"es-objects-accounts", boost::program_options::value<bool>(),
"Store account objects (true)")
419 (
"es-objects-accounts-store-updates", boost::program_options::value<bool>(),
420 "Store all updates to the account objects (false)")
422 (
"es-objects-assets", boost::program_options::value<bool>(),
"Store asset objects (true)")
423 (
"es-objects-assets-store-updates", boost::program_options::value<bool>(),
424 "Store all updates to the asset objects (false)")
426 (
"es-objects-balances", boost::program_options::value<bool>(),
"Store account balances (true)")
427 (
"es-objects-balances-store-updates", boost::program_options::value<bool>(),
428 "Store all updates to the account balances (false)")
430 (
"es-objects-limit-orders", boost::program_options::value<bool>(),
"Store limit order objects (true)")
431 (
"es-objects-limit-orders-store-updates", boost::program_options::value<bool>(),
432 "Store all updates to the limit orders (false)")
433 (
"es-objects-limit-orders-no-delete", boost::program_options::value<bool>(),
434 "Do not delete a limit order object from ES even if it is deleted from chain state. " 435 "It is implicitly true and can not be set to false if es-objects-limit-orders-store-updates is true. " 438 (
"es-objects-asset-bitasset", boost::program_options::value<bool>(),
439 "Store bitasset data, including price feeds (true)")
440 (
"es-objects-asset-bitasset-store-updates", boost::program_options::value<bool>(),
441 "Store all updates to the bitasset data (false)")
443 (
"es-objects-budget-records", boost::program_options::value<bool>(),
"Store budget records (true)")
445 (
"es-objects-index-prefix", boost::program_options::value<std::string>(),
446 "Add a prefix to the index(objects-)")
447 (
"es-objects-max-mapping-depth", boost::program_options::value<uint16_t>(),
448 "Can not exceed the maximum index mapping depth (index.mapping.depth.limit) setting in ES, " 449 "and need to be even smaller to not trigger the index.mapping.total_fields.limit error (10)")
450 (
"es-objects-keep-only-current", boost::program_options::value<bool>(),
451 "Deprecated. Please use the store-updates or no-delete options. " 452 "Keep only current state of the objects(true)")
453 (
"es-objects-start-es-after-block", boost::program_options::value<uint32_t>(),
454 "Start doing ES job after block(0)")
455 (
"es-objects-sync-db-on-startup", boost::program_options::value<bool>(),
456 "Copy all applicable objects from the object database (chain state) to ES on program startup (false)")
461 void detail::es_objects_plugin_impl::init_program_options(
const boost::program_options::variables_map& options)
463 _options.init( options );
465 es = std::make_unique<graphene::utilities::es_client>( _options.elasticsearch_url, _options.auth );
467 FC_ASSERT( es->check_status(),
"ES database is not up in url ${url}", (
"url", _options.elasticsearch_url) );
469 es->check_version_7_or_above( is_es_version_7_or_above );
472 void detail::es_objects_plugin_impl::plugin_options::init(
const boost::program_options::variables_map& options)
501 my->init_program_options( options );
504 const flat_set<account_id_type>& ) {
505 my->on_objects_create( ids );
508 const flat_set<account_id_type>& ) {
509 my->on_objects_update( ids );
512 const vector<const object*>&,
const flat_set<account_id_type>& ) {
513 my->on_objects_delete( ids );
520 if( 0 ==
database().head_block_num() )
522 else if( my->_options.sync_db_on_startup )
528 my->send_bulk_if_ready(
true);
static string to_string(const variant &v, output_formatting format=stringify_large_ints_and_doubles, uint32_t max_depth=DEFAULT_MAX_RECURSION_DEPTH)
Tracks the balance of a single account/asset pairThis object is indexed on owner and asset_type so th...
void plugin_initialize(const boost::program_options::variables_map &options) override
Perform early startup routines and register plugin indexes, callbacks, etc.
fc::signal< void(const vector< object_id_type > &, const vector< const object * > &, const flat_set< account_id_type > &)> removed_objects
data_loader(es_objects_plugin_impl *_my)
contains properties that only apply to bitassets (market issued assets)
~es_objects_plugin() override
time_point_sec head_block_time() const
graphene::chain::database & db
This class represents an account on the object graphAccounts are the primary unit of authority on the...
tracks the blockchain state in an extensible manner
#define GRAPHENE_NET_MAX_NESTED_OBJECTS
virtual void inspect_all_objects(std::function< void(const object &)> inspector) const =0
fc::signal< void(const vector< object_id_type > &, const flat_set< account_id_type > &)> changed_objects
void plugin_set_program_options(boost::program_options::options_description &cli, boost::program_options::options_description &cfg) override
Fill in command line parameters used by the plugin.
void get_program_option(const boost::program_options::variables_map &from, const std::string &key, T &to)
es_objects_plugin_impl(es_objects_plugin &_plugin)
void load(const es_objects_plugin_impl::plugin_options::object_options &opt, bool force_delete=false)
std::vector< std::string > createBulk(const fc::mutable_variant_object &bulk_header, std::string &&data)
object_options(bool e, bool su, bool nd, const string &in)
es_objects_plugin_impl * my
variant_object & get_object()
void plugin_startup() override
Begin normal runtime operations.
static constexpr size_t request_size_threshold
When doing bulk operations, call send_bulk when the approximate size of pending data reaches this val...
const T & get(const object_id_type &id) const
void to_variant(const flat_set< T, A... > &var, variant &vo, uint32_t _max_depth)
chain::database & database()
microseconds seconds(int64_t s)
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
static fc::variant adapt(const fc::variant_object &op, uint16_t max_depth)
const index & get_index() const
stores null, int64, uint64, double, bool, string, std::vector<variant>, and variant_object's.
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
void plugin_shutdown() override
Cleanly shut down the plugin.
tracks the parameters of an assetAll assets have a globally unique symbol name that controls how they...
uint32_t head_block_num() const
std::string plugin_description() const override
Get the description of the plugin.
base for all database objects
An order-perserving dictionary of variant's.
an offer to sell an amount of an asset at a specified exchange rate by a certain timeThe objects are ...
fc::signal< void(const vector< object_id_type > &, const flat_set< account_id_type > &)> new_objects
std::string plugin_name() const override
Get the name of the plugin.
tracks the approval of a partially approved transaction
static constexpr uint16_t space_type