BitShares-Core  5.0.0
BitShares blockchain implementation and command-line interface software
market_history_plugin.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 Cryptonomex, Inc., and contributors.
3  *
4  * The MIT License
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  */
24 
26 
35 
36 #include <fc/thread/thread.hpp>
37 
38 namespace graphene { namespace market_history {
39 
40 namespace detail
41 {
42 
44 {
45  public:
47  :_self( _plugin ) {}
49 
53  void update_market_histories( const signed_block& b );
54 
56  {
57  return _self.database();
58  }
59 
61  flat_set<uint32_t> _tracked_buckets;
65 };
66 
67 
69 {
73 
75  :_plugin(mhp),_now(n),_meta(meta) {}
76 
77  typedef void result_type;
78 
80  template<typename T>
81  void operator()( const T& )const{}
82 
83  void operator()( const fill_order_operation& o )const
84  {
85  //ilog( "processing ${o}", ("o",o) );
86  auto& db = _plugin.database();
87  const auto& order_his_idx = db.get_index_type<history_index>().indices();
88  const auto& history_idx = order_his_idx.get<by_key>();
89  const auto& his_time_idx = order_his_idx.get<by_market_time>();
90 
91  // To save new filled order data
92  history_key hkey;
93  hkey.base = o.pays.asset_id;
94  hkey.quote = o.receives.asset_id;
95  if( hkey.base > hkey.quote )
96  std::swap( hkey.base, hkey.quote );
97  hkey.sequence = std::numeric_limits<int64_t>::min();
98 
99  auto itr = history_idx.lower_bound( hkey );
100 
101  if( itr != history_idx.end() && itr->key.base == hkey.base && itr->key.quote == hkey.quote )
102  hkey.sequence = itr->key.sequence - 1;
103  else
104  hkey.sequence = 0;
105 
106  const auto& new_order_his_obj = db.create<order_history_object>( [&]( order_history_object& ho ) {
107  ho.key = hkey;
108  ho.time = _now;
109  ho.op = o;
110  });
111 
112  // save a reference to market ticker meta object
113  if( _meta == nullptr )
114  {
115  const auto& meta_idx = db.get_index_type<simple_index<market_ticker_meta_object>>();
116  if( meta_idx.size() == 0 )
117  _meta = &db.create<market_ticker_meta_object>( [&]( market_ticker_meta_object& mtm ) {
118  mtm.rolling_min_order_his_id = new_order_his_obj.id;
119  mtm.skip_min_order_his_id = false;
120  });
121  else
122  _meta = &( *meta_idx.begin() );
123  }
124 
125  // To remove old filled order data
126  const auto max_records = _plugin.max_order_his_records_per_market();
127  hkey.sequence += max_records;
128  itr = history_idx.lower_bound( hkey );
129  if( itr != history_idx.end() && itr->key.base == hkey.base && itr->key.quote == hkey.quote )
130  {
131  const auto max_seconds = _plugin.max_order_his_seconds_per_market();
132  fc::time_point_sec min_time;
133  if( min_time + max_seconds < _now )
134  min_time = _now - max_seconds;
135  auto time_itr = his_time_idx.lower_bound( std::make_tuple( hkey.base, hkey.quote, min_time ) );
136  if( time_itr != his_time_idx.end() && time_itr->key.base == hkey.base && time_itr->key.quote == hkey.quote )
137  {
138  if( itr->key.sequence >= time_itr->key.sequence )
139  {
140  while( itr != history_idx.end() && itr->key.base == hkey.base && itr->key.quote == hkey.quote )
141  {
142  auto old_itr = itr;
143  ++itr;
144  db.remove( *old_itr );
145  }
146  }
147  else
148  {
149  while( time_itr != his_time_idx.end() && time_itr->key.base == hkey.base && time_itr->key.quote == hkey.quote )
150  {
151  auto old_itr = time_itr;
152  ++time_itr;
153  db.remove( *old_itr );
154  }
155  }
156  }
157  }
158 
159  // To update ticker data and buckets data, only update for maker orders
160  if( !o.is_maker )
161  return;
162 
163  bucket_key key;
164  key.base = o.pays.asset_id;
165  key.quote = o.receives.asset_id;
166 
167  price trade_price = o.pays / o.receives;
168 
169  if( key.base > key.quote )
170  {
171  std::swap( key.base, key.quote );
172  trade_price = ~trade_price;
173  }
174 
175  price fill_price = o.fill_price;
176  if( fill_price.base.asset_id > fill_price.quote.asset_id )
177  fill_price = ~fill_price;
178 
179  // To update ticker data
180  const auto& ticker_idx = db.get_index_type<market_ticker_index>().indices().get<by_market>();
181  auto ticker_itr = ticker_idx.find( std::make_tuple( key.base, key.quote ) );
182  if( ticker_itr == ticker_idx.end() )
183  {
184  db.create<market_ticker_object>( [&]( market_ticker_object& mt ) {
185  mt.base = key.base;
186  mt.quote = key.quote;
187  mt.last_day_base = 0;
188  mt.last_day_quote = 0;
189  mt.latest_base = fill_price.base.amount;
190  mt.latest_quote = fill_price.quote.amount;
191  mt.base_volume = trade_price.base.amount.value;
192  mt.quote_volume = trade_price.quote.amount.value;
193  });
194  }
195  else
196  {
197  db.modify( *ticker_itr, [&]( market_ticker_object& mt ) {
198  mt.latest_base = fill_price.base.amount;
199  mt.latest_quote = fill_price.quote.amount;
200  mt.base_volume += trade_price.base.amount.value; // ignore overflow
201  mt.quote_volume += trade_price.quote.amount.value; // ignore overflow
202  });
203  }
204 
205  // To update buckets data
206  const auto max_history = _plugin.max_history();
207  if( max_history == 0 ) return;
208 
209  const auto& buckets = _plugin.tracked_buckets();
210  if( buckets.size() == 0 ) return;
211 
212  const auto& bucket_idx = db.get_index_type<bucket_index>();
213  for( auto bucket : buckets )
214  {
215  auto bucket_num = _now.sec_since_epoch() / bucket;
216  fc::time_point_sec cutoff;
217  if( bucket_num > max_history )
218  cutoff = cutoff + ( bucket * ( bucket_num - max_history ) );
219 
220  key.seconds = bucket;
221  key.open = fc::time_point_sec() + ( bucket_num * bucket );
222 
223  const auto& by_key_idx = bucket_idx.indices().get<by_key>();
224  auto bucket_itr = by_key_idx.find( key );
225  if( bucket_itr == by_key_idx.end() )
226  { // create new bucket
227  /* const auto& obj = */
228  db.create<bucket_object>( [&]( bucket_object& b ){
229  b.key = key;
230  b.base_volume = trade_price.base.amount;
231  b.quote_volume = trade_price.quote.amount;
232  b.open_base = fill_price.base.amount;
233  b.open_quote = fill_price.quote.amount;
234  b.close_base = fill_price.base.amount;
235  b.close_quote = fill_price.quote.amount;
236  b.high_base = b.close_base;
237  b.high_quote = b.close_quote;
238  b.low_base = b.close_base;
239  b.low_quote = b.close_quote;
240  });
241  //wlog( " creating bucket ${b}", ("b",obj) );
242  }
243  else
244  { // update existing bucket
245  //wlog( " before updating bucket ${b}", ("b",*bucket_itr) );
246  db.modify( *bucket_itr, [&]( bucket_object& b ){
247  try {
248  b.base_volume += trade_price.base.amount;
249  } catch( fc::overflow_exception& ) {
250  b.base_volume = std::numeric_limits<int64_t>::max();
251  }
252  try {
253  b.quote_volume += trade_price.quote.amount;
254  } catch( fc::overflow_exception& ) {
255  b.quote_volume = std::numeric_limits<int64_t>::max();
256  }
257  b.close_base = fill_price.base.amount;
258  b.close_quote = fill_price.quote.amount;
259  if( b.high() < fill_price )
260  {
261  b.high_base = b.close_base;
262  b.high_quote = b.close_quote;
263  }
264  if( b.low() > fill_price )
265  {
266  b.low_base = b.close_base;
267  b.low_quote = b.close_quote;
268  }
269  });
270  //wlog( " after bucket bucket ${b}", ("b",*bucket_itr) );
271  }
272 
273  {
274  key.open = fc::time_point_sec();
275  bucket_itr = by_key_idx.lower_bound( key );
276 
277  while( bucket_itr != by_key_idx.end() &&
278  bucket_itr->key.base == key.base &&
279  bucket_itr->key.quote == key.quote &&
280  bucket_itr->key.seconds == bucket &&
281  bucket_itr->key.open < cutoff )
282  {
283  // elog( " removing old bucket ${b}", ("b", *bucket_itr) );
284  auto old_bucket_itr = bucket_itr;
285  ++bucket_itr;
286  db.remove( *old_bucket_itr );
287  }
288  }
289  }
290  }
291 };
292 
294 {}
295 
297 {
299  const market_ticker_meta_object* _meta = nullptr;
300  const auto& meta_idx = db.get_index_type<simple_index<market_ticker_meta_object>>();
301  if( meta_idx.size() > 0 )
302  _meta = &( *meta_idx.begin() );
303  const vector<optional< operation_history_object > >& hist = db.get_applied_operations();
304  for( const optional< operation_history_object >& o_op : hist )
305  {
306  if( o_op.valid() )
307  {
308  try
309  {
310  o_op->op.visit( operation_process_fill_order( _self, b.timestamp, _meta ) );
311  } FC_CAPTURE_AND_LOG( (o_op) )
312  }
313  }
314  // roll out expired data from ticker
315  if( _meta != nullptr )
316  {
317  time_point_sec last_day = b.timestamp - 86400;
318  object_id_type last_min_his_id = _meta->rolling_min_order_his_id;
319  bool skip = _meta->skip_min_order_his_id;
320 
321  const auto& ticker_idx = db.get_index_type<market_ticker_index>().indices().get<by_market>();
322  const auto& history_idx = db.get_index_type<history_index>().indices().get<by_id>();
323  auto history_itr = history_idx.lower_bound( _meta->rolling_min_order_his_id );
324  while( history_itr != history_idx.end() && history_itr->time < last_day )
325  {
326  const fill_order_operation& o = history_itr->op;
327  if( skip && history_itr->id == _meta->rolling_min_order_his_id )
328  skip = false;
329  else if( o.is_maker )
330  {
331  bucket_key key;
332  key.base = o.pays.asset_id;
333  key.quote = o.receives.asset_id;
334 
335  price trade_price = o.pays / o.receives;
336 
337  if( key.base > key.quote )
338  {
339  std::swap( key.base, key.quote );
340  trade_price = ~trade_price;
341  }
342 
343  price fill_price = o.fill_price;
344  if( fill_price.base.asset_id > fill_price.quote.asset_id )
345  fill_price = ~fill_price;
346 
347  auto ticker_itr = ticker_idx.find( std::make_tuple( key.base, key.quote ) );
348  if( ticker_itr != ticker_idx.end() ) // should always be true
349  {
350  db.modify( *ticker_itr, [&]( market_ticker_object& mt ) {
351  mt.last_day_base = fill_price.base.amount;
352  mt.last_day_quote = fill_price.quote.amount;
353  mt.base_volume -= trade_price.base.amount.value; // ignore underflow
354  mt.quote_volume -= trade_price.quote.amount.value; // ignore underflow
355  });
356  }
357  }
358  last_min_his_id = history_itr->id;
359  ++history_itr;
360  }
361  // update meta
362  if( history_itr != history_idx.end() ) // if still has some data rolling
363  {
364  if( history_itr->id != _meta->rolling_min_order_his_id ) // if rolled out some
365  {
366  db.modify( *_meta, [&]( market_ticker_meta_object& mtm ) {
367  mtm.rolling_min_order_his_id = history_itr->id;
368  mtm.skip_min_order_his_id = false;
369  });
370  }
371  }
372  else // if all data are rolled out
373  {
374  if( !_meta->skip_min_order_his_id
375  || last_min_his_id != _meta->rolling_min_order_his_id ) // if rolled out some
376  {
377  db.modify( *_meta, [&]( market_ticker_meta_object& mtm ) {
378  mtm.rolling_min_order_his_id = last_min_his_id;
379  mtm.skip_min_order_his_id = true;
380  });
381  }
382  }
383  }
384 }
385 
386 } // end namespace detail
387 
388 
389 
390 
391 
392 
394  my( new detail::market_history_plugin_impl(*this) )
395 {
396 }
397 
399 {
400 }
401 
403 {
404  return "market_history";
405 }
406 
408  boost::program_options::options_description& cli,
409  boost::program_options::options_description& cfg
410  )
411 {
412  cli.add_options()
413  ("bucket-size", boost::program_options::value<string>()->default_value("[60,300,900,1800,3600,14400,86400]"),
414  "Track market history by grouping orders into buckets of equal size measured in seconds specified as a JSON array of numbers")
415  ("history-per-size", boost::program_options::value<uint32_t>()->default_value(1000),
416  "How far back in time to track history for each bucket size, measured in the number of buckets (default: 1000)")
417  ("max-order-his-records-per-market", boost::program_options::value<uint32_t>()->default_value(1000),
418  "Will only store this amount of matched orders for each market in order history for querying, or those meet the other option, which has more data (default: 1000)")
419  ("max-order-his-seconds-per-market", boost::program_options::value<uint32_t>()->default_value(259200),
420  "Will only store matched orders in last X seconds for each market in order history for querying, or those meet the other option, which has more data (default: 259200 (3 days))")
421  ;
422  cfg.add(cli);
423 }
424 
425 void market_history_plugin::plugin_initialize(const boost::program_options::variables_map& options)
426 { try {
427  database().applied_block.connect( [this]( const signed_block& b){ my->update_market_histories(b); } );
432 
433  if( options.count( "bucket-size" ) )
434  {
435  const std::string& buckets = options["bucket-size"].as<string>();
436  my->_tracked_buckets = fc::json::from_string(buckets).as<flat_set<uint32_t>>(2);
437  my->_tracked_buckets.erase( 0 );
438  }
439  if( options.count( "history-per-size" ) )
440  my->_maximum_history_per_bucket_size = options["history-per-size"].as<uint32_t>();
441  if( options.count( "max-order-his-records-per-market" ) )
442  my->_max_order_his_records_per_market = options["max-order-his-records-per-market"].as<uint32_t>();
443  if( options.count( "max-order-his-seconds-per-market" ) )
444  my->_max_order_his_seconds_per_market = options["max-order-his-seconds-per-market"].as<uint32_t>();
446 
448 {
449 }
450 
451 const flat_set<uint32_t>& market_history_plugin::tracked_buckets() const
452 {
453  return my->_tracked_buckets;
454 }
455 
457 {
458  return my->_maximum_history_per_bucket_size;
459 }
460 
462 {
463  return my->_max_order_his_records_per_market;
464 }
465 
467 {
468  return my->_max_order_his_seconds_per_market;
469 }
470 
471 } }
void modify(const T &obj, const Lambda &m)
Wraps a derived index to intercept calls to create, modify, and remove so that callbacks may be fired...
Definition: index.hpp:309
T as(uint32_t max_depth) const
Definition: variant.hpp:336
virtual void plugin_set_program_options(boost::program_options::options_description &cli, boost::program_options::options_description &cfg) override
Fill in command line parameters used by the plugin.
fc::signal< void(const signed_block &)> applied_block
Definition: database.hpp:197
A simple index uses a vector<unique_ptr<T>> to store data.
uint32_t sec_since_epoch() const
Definition: time.hpp:90
tracks the blockchain state in an extensible manner
Definition: database.hpp:70
Definition: api.cpp:56
const flat_set< uint32_t > & tracked_buckets() const
#define FC_CAPTURE_AND_LOG(...)
Definition: exception.hpp:437
provides stack-based nullable value similar to boost::optional
Definition: optional.hpp:20
virtual void plugin_startup() override
Begin normal runtime operations.
const vector< optional< operation_history_object > > & get_applied_operations() const
Definition: db_block.cpp:548
chain::database & database()
Definition: plugin.hpp:114
The price struct stores asset prices in the BitShares system.
Definition: asset.hpp:114
virtual void plugin_initialize(const boost::program_options::variables_map &options) override
Perform early startup routines and register plugin indexes, callbacks, etc.
#define FC_CAPTURE_AND_RETHROW(...)
Definition: exception.hpp:478
const object & get(object_id_type id) const
Definition: index.hpp:111
asset_id_type asset_id
Definition: asset.hpp:39
static variant from_string(const string &utf8_str, parse_type ptype=legacy_parser, uint32_t max_depth=DEFAULT_MAX_RECURSION_DEPTH)
Definition: json.cpp:458
operation_process_fill_order(market_history_plugin &mhp, fc::time_point_sec n, const market_ticker_meta_object *&meta)
const IndexType & get_index_type() const
T value
Definition: safe.hpp:22