BitShares-Core  4.0.0
BitShares blockchain implementation and command-line interface software
rate_limiting.cpp
Go to the documentation of this file.
4 #include <list>
5 #include <algorithm>
6 #include <fc/network/ip.hpp>
7 #include <fc/fwd_impl.hpp>
8 #include <fc/asio.hpp>
9 #include <fc/log/logger.hpp>
10 #include <fc/io/stdio.hpp>
12 #include <fc/thread/thread.hpp>
13 
14 namespace fc
15 {
16 
17  namespace detail
18  {
19  // data about a read or write we're managing
21  {
22  public:
23  size_t length;
24  size_t offset;
27 
28  rate_limited_operation(size_t length,
29  size_t offset,
30  promise<size_t>::ptr&& completion_promise) :
31  length(length),
32  offset(offset),
33  permitted_length(0),
34  completion_promise(completion_promise)
35  {}
36 
37  virtual void perform_operation() = 0;
38  };
39 
41  {
42  public:
43  boost::asio::ip::tcp::socket& socket;
44  const char* raw_buffer;
45  std::shared_ptr<const char> shared_buffer;
46 
47  rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket& socket,
48  const char* buffer,
49  size_t length,
50  size_t offset,
52  rate_limited_operation(length, offset, std::move(completion_promise)),
53  socket(socket),
54  raw_buffer(buffer)
55  {
56  assert(false);
57  }
58  rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket& socket,
59  const std::shared_ptr<const char>& buffer,
60  size_t length,
61  size_t offset,
63  rate_limited_operation(length, offset, std::move(completion_promise)),
64  socket(socket),
65  raw_buffer(nullptr),
66  shared_buffer(buffer)
67  {}
68  virtual void perform_operation() override
69  {
70  if (raw_buffer)
72  raw_buffer, permitted_length,
74  else
76  shared_buffer, permitted_length, offset,
78  }
79  };
80 
82  {
83  public:
84  boost::asio::ip::tcp::socket& socket;
85  char* raw_buffer;
86  std::shared_ptr<char> shared_buffer;
87 
88  rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket& socket,
89  char* buffer,
90  size_t length,
91  size_t offset,
93  rate_limited_operation(length, offset, std::move(completion_promise)),
94  socket(socket),
95  raw_buffer(buffer)
96  {}
97  rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket& socket,
98  const std::shared_ptr<char>& buffer,
99  size_t length,
100  size_t offset,
102  rate_limited_operation(length, offset, std::move(completion_promise)),
103  socket(socket),
104  raw_buffer(nullptr),
105  shared_buffer(buffer)
106  {}
107  virtual void perform_operation() override
108  {
109  if (raw_buffer)
110  asio::async_read_some(socket,
111  raw_buffer, permitted_length,
113  else
114  asio::async_read_some(socket,
115  shared_buffer, permitted_length, offset,
117 
118  }
119  };
120 
122  {
123  // less than operator designed to bring the shortest operations to the end
125  {
126  return lhs->length > rhs->length;
127  }
128  };
129 
131  {
132  private:
133  mutable double _average_rate;
134  mutable uint32_t _unaccounted_bytes;
135  mutable time_point _last_update_time;
136  microseconds _time_constant;
137 
138  void update_const(uint32_t bytes_transferred = 0) const;
139  public:
140  average_rate_meter(const microseconds& time_constant = seconds(10));
141  void set_time_constant(const microseconds& time_constant);
142  void update(uint32_t bytes_transferred = 0);
143  uint32_t get_average_rate() const;
144  };
146  _average_rate(0.),
147  _unaccounted_bytes(0),
148  _last_update_time(time_point_sec::min()),
149  _time_constant(time_constant)
150  {}
152  {
153  _time_constant = time_constant;
154  }
155  void average_rate_meter::update(uint32_t bytes_transferred /* = 0 */)
156  {
157  update_const(bytes_transferred);
158  }
159  void average_rate_meter::update_const(uint32_t bytes_transferred /* = 0 */) const
160  {
161  time_point now = time_point::now();
162  if (now <= _last_update_time)
163  _unaccounted_bytes += bytes_transferred;
164  else
165  {
166  microseconds time_since_last_update = now - _last_update_time;
167  if (time_since_last_update > _time_constant)
168  _average_rate = bytes_transferred / (_time_constant.count() / (double)seconds(1).count());
169  else
170  {
171  bytes_transferred += _unaccounted_bytes;
172  double seconds_since_last_update = time_since_last_update.count() / (double)seconds(1).count();
173  double rate_since_last_update = bytes_transferred / seconds_since_last_update;
174  double alpha = time_since_last_update.count() / (double)_time_constant.count();
175  _average_rate = rate_since_last_update * alpha + _average_rate * (1.0 - alpha);
176  }
177  _last_update_time = now;
178  _unaccounted_bytes = 0;
179  }
180  }
182  {
183  update_const();
184  return (uint32_t)_average_rate;
185  }
186 
188  {
189  public:
193 
194  microseconds _granularity; // how often to add tokens to the bucket
195  uint32_t _read_tokens;
196  uint32_t _unused_read_tokens; // gets filled with tokens for unused bytes (if I'm allowed to read 200 bytes and I try to read 200 bytes, but can only read 50, tokens for the other 150 get returned here)
197  uint32_t _write_tokens;
199 
200  typedef std::list<rate_limited_operation*> rate_limited_operation_list;
201  rate_limited_operation_list _read_operations_in_progress;
202  rate_limited_operation_list _read_operations_for_next_iteration;
203  rate_limited_operation_list _write_operations_in_progress;
204  rate_limited_operation_list _write_operations_for_next_iteration;
205 
208 
213 
216 
217  rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second,
218  uint32_t burstiness_in_seconds = 1);
220 
221  virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) override;
222  virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length, size_t offset) override;
223  template <typename BufferType>
224  size_t readsome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length, size_t offset);
225  virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) override;
226  virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length, size_t offset) override;
227  template <typename BufferType>
228  size_t writesome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length, size_t offset);
229 
230  void process_pending_reads();
231  void process_pending_writes();
232  void process_pending_operations(time_point& last_iteration_start_time,
233  uint32_t& limit_bytes_per_second,
234  rate_limited_operation_list& operations_in_progress,
235  rate_limited_operation_list& operations_for_next_iteration,
236  uint32_t& tokens,
237  uint32_t& unused_tokens);
238  };
239 
240  rate_limiting_group_impl::rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second,
241  uint32_t burstiness_in_seconds) :
242  _upload_bytes_per_second(upload_bytes_per_second),
243  _download_bytes_per_second(download_bytes_per_second),
244  _burstiness_in_seconds(burstiness_in_seconds),
245  _granularity(milliseconds(50)),
246  _read_tokens(_download_bytes_per_second),
247  _unused_read_tokens(0),
248  _write_tokens(_upload_bytes_per_second),
249  _unused_write_tokens(0)
250  {
251  }
252 
254  {
255  try
256  {
258  }
259  catch (...)
260  {
261  }
262  try
263  {
265  }
266  catch (...)
267  {
268  }
269  }
270 
271  size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length, size_t offset)
272  {
273  return readsome_impl(socket, buffer, length, offset);
274  }
275 
276  size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length)
277  {
278  return readsome_impl(socket, buffer, length, 0);
279  }
280 
281  template <typename BufferType>
282  size_t rate_limiting_group_impl::readsome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length, size_t offset)
283  {
284  size_t bytes_read;
286  {
287  promise<size_t>::ptr completion_promise = promise<size_t>::create("rate_limiting_group_impl::readsome");
288  rate_limited_tcp_read_operation read_operation(socket, buffer, length, offset, completion_promise);
289  _read_operations_for_next_iteration.push_back(&read_operation);
290 
291  // launch the read processing loop it if isn't running, or signal it to resume if it's paused.
293  _process_pending_reads_loop_complete = async([=](){ process_pending_reads(); }, "process_pending_reads" );
296 
297  try
298  {
299  bytes_read = completion_promise->wait();
300  }
301  catch (...)
302  {
303  _read_operations_for_next_iteration.remove(&read_operation);
304  _read_operations_in_progress.remove(&read_operation);
305  throw;
306  }
307  _unused_read_tokens += read_operation.permitted_length - bytes_read;
308  }
309  else
310  bytes_read = asio::read_some(socket, buffer, length, offset);
311 
312  _actual_download_rate.update(bytes_read);
313 
314  return bytes_read;
315  }
316 
317  size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length)
318  {
319  return writesome_impl(socket, buffer, length, 0);
320  }
321 
322  size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length, size_t offset)
323  {
324  return writesome_impl(socket, buffer, length, offset);
325  }
326 
327  template <typename BufferType>
328  size_t rate_limiting_group_impl::writesome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length, size_t offset)
329  {
330  size_t bytes_written;
332  {
333  promise<size_t>::ptr completion_promise = promise<size_t>::create("rate_limiting_group_impl::writesome");
334  rate_limited_tcp_write_operation write_operation(socket, buffer, length, offset, completion_promise);
335  _write_operations_for_next_iteration.push_back(&write_operation);
336 
337  // launch the write processing loop it if isn't running, or signal it to resume if it's paused.
339  _process_pending_writes_loop_complete = async([=](){ process_pending_writes(); }, "process_pending_writes");
342 
343  try
344  {
345  bytes_written = completion_promise->wait();
346  }
347  catch (...)
348  {
349  _write_operations_for_next_iteration.remove(&write_operation);
350  _write_operations_in_progress.remove(&write_operation);
351  throw;
352  }
353  _unused_write_tokens += write_operation.permitted_length - bytes_written;
354  }
355  else
356  bytes_written = asio::write_some(socket, buffer, length, offset);
357 
358  _actual_upload_rate.update(bytes_written);
359 
360  return bytes_written;
361  }
362 
364  {
365  for (;;)
366  {
369 
370  _new_read_operation_available_promise = promise<void>::create("rate_limiting_group_impl::process_pending_reads");
371  try
372  {
373  if (_read_operations_in_progress.empty())
375  else
377  }
378  catch (const timeout_exception&)
379  {
380  }
382  }
383  }
385  {
386  for (;;)
387  {
390 
391  _new_write_operation_available_promise = promise<void>::create("rate_limiting_group_impl::process_pending_writes");
392  try
393  {
394  if (_write_operations_in_progress.empty())
396  else
398  }
399  catch (const timeout_exception&)
400  {
401  }
403  }
404  }
406  uint32_t& limit_bytes_per_second,
407  rate_limited_operation_list& operations_in_progress,
408  rate_limited_operation_list& operations_for_next_iteration,
409  uint32_t& tokens,
410  uint32_t& unused_tokens)
411  {
412  // lock here for multithreaded
413  std::copy(operations_for_next_iteration.begin(),
414  operations_for_next_iteration.end(),
415  std::back_inserter(operations_in_progress));
416  operations_for_next_iteration.clear();
417 
418  // find out how much time since our last read/write
419  time_point this_iteration_start_time = time_point::now();
420  if (limit_bytes_per_second) // the we are limiting up/download speed
421  {
422  microseconds time_since_last_iteration = this_iteration_start_time - last_iteration_start_time;
423  if (time_since_last_iteration > seconds(1))
424  time_since_last_iteration = seconds(1);
425  else if (time_since_last_iteration < microseconds(0))
426  time_since_last_iteration = microseconds(0);
427 
428  tokens += (uint32_t)((limit_bytes_per_second * time_since_last_iteration.count()) / 1000000);
429  tokens += unused_tokens;
430  unused_tokens = 0;
431  tokens = std::min(tokens, limit_bytes_per_second * _burstiness_in_seconds);
432 
433  if (tokens)
434  {
435  // sort the pending reads/writes in order of the number of bytes they need to write, smallest first
436  std::vector<rate_limited_operation*> operations_sorted_by_length;
437  operations_sorted_by_length.reserve(operations_in_progress.size());
438  for (rate_limited_operation* operation_data : operations_in_progress)
439  operations_sorted_by_length.push_back(operation_data);
440  std::sort(operations_sorted_by_length.begin(), operations_sorted_by_length.end(), is_operation_shorter());
441 
442  // figure out how many bytes each reader/writer is allowed to read/write
443  uint32_t bytes_remaining_to_allocate = tokens;
444  while (!operations_sorted_by_length.empty())
445  {
446  uint32_t bytes_permitted_for_this_operation = bytes_remaining_to_allocate / operations_sorted_by_length.size();
447  uint32_t bytes_allocated_for_this_operation = std::min<size_t>(operations_sorted_by_length.back()->length, bytes_permitted_for_this_operation);
448  operations_sorted_by_length.back()->permitted_length = bytes_allocated_for_this_operation;
449  bytes_remaining_to_allocate -= bytes_allocated_for_this_operation;
450  operations_sorted_by_length.pop_back();
451  }
452  tokens = bytes_remaining_to_allocate;
453 
454  // kick off the reads/writes in first-come order
455  for (auto iter = operations_in_progress.begin(); iter != operations_in_progress.end();)
456  {
457  if ((*iter)->permitted_length > 0)
458  {
459  rate_limited_operation* operation_to_perform = *iter;
460  iter = operations_in_progress.erase(iter);
461  operation_to_perform->perform_operation();
462  }
463  else
464  ++iter;
465  }
466  }
467  }
468  else // down/upload speed is unlimited
469  {
470  // we shouldn't end up here often. If the rate is unlimited, we should just execute
471  // the operation immediately without being queued up. This should only be hit if
472  // we change from a limited rate to unlimited
473  for (auto iter = operations_in_progress.begin();
474  iter != operations_in_progress.end();)
475  {
476  rate_limited_operation* operation_to_perform = *iter;
477  iter = operations_in_progress.erase(iter);
478  operation_to_perform->permitted_length = operation_to_perform->length;
479  operation_to_perform->perform_operation();
480  }
481  }
482  last_iteration_start_time = this_iteration_start_time;
483  }
484 
485  }
486 
487  rate_limiting_group::rate_limiting_group(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second, uint32_t burstiness_in_seconds /* = 1 */) :
488  my(new detail::rate_limiting_group_impl(upload_bytes_per_second, download_bytes_per_second, burstiness_in_seconds))
489  {
490  }
491 
493  {
494  }
495 
497  {
498  return my->_actual_upload_rate.get_average_rate();
499  }
500 
502  {
503  return my->_actual_download_rate.get_average_rate();
504  }
505 
507  {
508  my->_actual_upload_rate.set_time_constant(time_constant);
509  my->_actual_download_rate.set_time_constant(time_constant);
510  }
511 
512  void rate_limiting_group::set_upload_limit(uint32_t upload_bytes_per_second)
513  {
514  my->_upload_bytes_per_second = upload_bytes_per_second;
515  }
516 
518  {
519  return my->_upload_bytes_per_second;
520  }
521 
522  void rate_limiting_group::set_download_limit(uint32_t download_bytes_per_second)
523  {
524  my->_download_bytes_per_second = download_bytes_per_second;
525  }
526 
528  {
529  return my->_download_bytes_per_second;
530  }
531 
533  {
534  tcp_socket_to_limit->set_io_hooks(my.get());
535  }
536 
537  void rate_limiting_group::remove_tcp_socket(tcp_socket* tcp_socket_to_stop_limiting)
538  {
539  tcp_socket_to_stop_limiting->set_io_hooks(NULL);
540  }
541 
542 
543 } // namespace fc
void process_pending_operations(time_point &last_iteration_start_time, uint32_t &limit_bytes_per_second, rate_limited_operation_list &operations_in_progress, rate_limited_operation_list &operations_for_next_iteration, uint32_t &tokens, uint32_t &unused_tokens)
bool valid() const
Definition: future.hpp:311
auto async(Functor &&f, const char *desc FC_TASK_NAME_DEFAULT_ARG, priority prio=priority()) -> fc::future< decltype(f())>
Definition: thread.hpp:227
static ptr create(const char *desc FC_TASK_NAME_DEFAULT_ARG)
Definition: future.hpp:114
future< size_t > write_some(AsyncWriteStream &s, const ConstBufferSequence &buf)
wraps boost::asio::async_write_some
Definition: asio.hpp:193
uint32_t get_upload_limit() const
rate_limited_operation_list _write_operations_for_next_iteration
void set_io_hooks(tcp_socket_io_hooks *new_hooks)
Definition: tcp_socket.cpp:236
rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket &socket, const std::shared_ptr< const char > &buffer, size_t length, size_t offset, promise< size_t >::ptr completion_promise)
rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second, uint32_t burstiness_in_seconds=1)
microseconds milliseconds(int64_t s)
Definition: time.hpp:35
uint32_t get_actual_upload_rate() const
std::shared_ptr< const char > shared_buffer
uint32_t get_average_rate() const
std::list< rate_limited_operation * > rate_limited_operation_list
rate_limited_operation_list _read_operations_in_progress
void add_tcp_socket(tcp_socket *tcp_socket_to_limit)
void update(uint32_t bytes_transferred=0)
promise< size_t >::ptr completion_promise
size_t readsome_impl(boost::asio::ip::tcp::socket &socket, const BufferType &buffer, size_t length, size_t offset)
void set_actual_rate_time_constant(microseconds time_constant)
void async_read_some(AsyncReadStream &s, const MutableBufferSequence &buf, promise< size_t >::ptr completion_promise)
Definition: asio.hpp:149
const T & wait(const microseconds &timeout=microseconds::maximum())
Definition: future.hpp:127
microseconds seconds(int64_t s)
Definition: time.hpp:34
boost::asio::ip::tcp::socket & socket
int64_t count() const
Definition: time.hpp:28
void cancel_and_wait(const char *reason FC_CANCELATION_REASON_DEFAULT_ARG)
Definition: future.hpp:314
void set_time_constant(const microseconds &time_constant)
void remove_tcp_socket(tcp_socket *tcp_socket_to_stop_limiting)
rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket &socket, char *buffer, size_t length, size_t offset, promise< size_t >::ptr completion_promise)
promise< void >::ptr _new_read_operation_available_promise
future< void > _process_pending_writes_loop_complete
void set_download_limit(uint32_t download_bytes_per_second)
boost::asio::ip::tcp::socket & socket
rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket &socket, const char *buffer, size_t length, size_t offset, promise< size_t >::ptr completion_promise)
bool operator()(const rate_limited_operation *lhs, const rate_limited_operation *rhs)
rate_limited_operation_list _read_operations_for_next_iteration
bool ready() const
Definition: future.hpp:327
uint32_t get_actual_download_rate() const
rate_limited_operation(size_t length, size_t offset, promise< size_t >::ptr &&completion_promise)
Defines exception&#39;s used by fc.
rate_limiting_group(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second, uint32_t burstiness_in_seconds=1)
virtual size_t readsome(boost::asio::ip::tcp::socket &socket, char *buffer, size_t length) override
future< size_t > read_some(AsyncReadStream &s, const MutableBufferSequence &buf)
Definition: asio.hpp:123
void wait(const microseconds &timeout=microseconds::maximum())
Definition: future.hpp:174
uint32_t get_download_limit() const
void async_write_some(AsyncWriteStream &s, const ConstBufferSequence &buf, promise< size_t >::ptr completion_promise)
wraps boost::asio::async_write_some
Definition: asio.hpp:221
void set_upload_limit(uint32_t upload_bytes_per_second)
Definition: api.hpp:15
promise< void >::ptr _new_write_operation_available_promise
virtual size_t writesome(boost::asio::ip::tcp::socket &socket, const char *buffer, size_t length) override
void copy(const path &from, const path &to)
Definition: filesystem.cpp:241
static time_point now()
Definition: time.cpp:13
std::shared_ptr< promise< T > > ptr
Definition: future.hpp:111
rate_limited_operation_list _write_operations_in_progress
average_rate_meter(const microseconds &time_constant=seconds(10))
rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket &socket, const std::shared_ptr< char > &buffer, size_t length, size_t offset, promise< size_t >::ptr completion_promise)
size_t writesome_impl(boost::asio::ip::tcp::socket &socket, const BufferType &buffer, size_t length, size_t offset)