34 completion_promise(completion_promise)
43 boost::asio::ip::tcp::socket&
socket;
59 const std::shared_ptr<const char>& buffer,
84 boost::asio::ip::tcp::socket&
socket;
98 const std::shared_ptr<char>& buffer,
105 shared_buffer(buffer)
133 mutable double _average_rate;
134 mutable uint32_t _unaccounted_bytes;
138 void update_const(uint32_t bytes_transferred = 0)
const;
141 void set_time_constant(
const microseconds& time_constant);
142 void update(uint32_t bytes_transferred = 0);
143 uint32_t get_average_rate()
const;
147 _unaccounted_bytes(0),
149 _time_constant(time_constant)
153 _time_constant = time_constant;
157 update_const(bytes_transferred);
159 void average_rate_meter::update_const(uint32_t bytes_transferred )
const 162 if (now <= _last_update_time)
163 _unaccounted_bytes += bytes_transferred;
166 microseconds time_since_last_update = now - _last_update_time;
167 if (time_since_last_update > _time_constant)
168 _average_rate = bytes_transferred / (_time_constant.
count() / (double)
seconds(1).
count());
171 bytes_transferred += _unaccounted_bytes;
172 double seconds_since_last_update = time_since_last_update.
count() / (double)
seconds(1).
count();
173 double rate_since_last_update = bytes_transferred / seconds_since_last_update;
174 double alpha = time_since_last_update.
count() / (double)_time_constant.
count();
175 _average_rate = rate_since_last_update * alpha + _average_rate * (1.0 - alpha);
177 _last_update_time = now;
178 _unaccounted_bytes = 0;
184 return (uint32_t)_average_rate;
218 uint32_t burstiness_in_seconds = 1);
221 virtual size_t readsome(boost::asio::ip::tcp::socket& socket,
char* buffer,
size_t length)
override;
222 virtual size_t readsome(boost::asio::ip::tcp::socket& socket,
const std::shared_ptr<char>& buffer,
size_t length,
size_t offset)
override;
223 template <
typename BufferType>
224 size_t readsome_impl(boost::asio::ip::tcp::socket& socket,
const BufferType& buffer,
size_t length,
size_t offset);
225 virtual size_t writesome(boost::asio::ip::tcp::socket& socket,
const char* buffer,
size_t length)
override;
226 virtual size_t writesome(boost::asio::ip::tcp::socket& socket,
const std::shared_ptr<const char>& buffer,
size_t length,
size_t offset)
override;
227 template <
typename BufferType>
228 size_t writesome_impl(boost::asio::ip::tcp::socket& socket,
const BufferType& buffer,
size_t length,
size_t offset);
230 void process_pending_reads();
231 void process_pending_writes();
232 void process_pending_operations(
time_point& last_iteration_start_time,
233 uint32_t& limit_bytes_per_second,
234 rate_limited_operation_list& operations_in_progress,
235 rate_limited_operation_list& operations_for_next_iteration,
237 uint32_t& unused_tokens);
241 uint32_t burstiness_in_seconds) :
242 _upload_bytes_per_second(upload_bytes_per_second),
243 _download_bytes_per_second(download_bytes_per_second),
244 _burstiness_in_seconds(burstiness_in_seconds),
246 _read_tokens(_download_bytes_per_second),
247 _unused_read_tokens(0),
248 _write_tokens(_upload_bytes_per_second),
249 _unused_write_tokens(0)
281 template <
typename BufferType>
299 bytes_read = completion_promise->
wait();
327 template <
typename BufferType>
330 size_t bytes_written;
345 bytes_written = completion_promise->
wait();
360 return bytes_written;
378 catch (
const timeout_exception&)
399 catch (
const timeout_exception&)
406 uint32_t& limit_bytes_per_second,
410 uint32_t& unused_tokens)
413 std::copy(operations_for_next_iteration.begin(),
414 operations_for_next_iteration.end(),
415 std::back_inserter(operations_in_progress));
416 operations_for_next_iteration.clear();
420 if (limit_bytes_per_second)
422 microseconds time_since_last_iteration = this_iteration_start_time - last_iteration_start_time;
423 if (time_since_last_iteration >
seconds(1))
424 time_since_last_iteration =
seconds(1);
428 tokens += (uint32_t)((limit_bytes_per_second * time_since_last_iteration.
count()) / 1000000);
429 tokens += unused_tokens;
436 std::vector<rate_limited_operation*> operations_sorted_by_length;
437 operations_sorted_by_length.reserve(operations_in_progress.size());
439 operations_sorted_by_length.push_back(operation_data);
440 std::sort(operations_sorted_by_length.begin(), operations_sorted_by_length.end(),
is_operation_shorter());
443 uint32_t bytes_remaining_to_allocate = tokens;
444 while (!operations_sorted_by_length.empty())
446 uint32_t bytes_permitted_for_this_operation = bytes_remaining_to_allocate / operations_sorted_by_length.size();
447 uint32_t bytes_allocated_for_this_operation = std::min<size_t>(operations_sorted_by_length.back()->length, bytes_permitted_for_this_operation);
448 operations_sorted_by_length.back()->permitted_length = bytes_allocated_for_this_operation;
449 bytes_remaining_to_allocate -= bytes_allocated_for_this_operation;
450 operations_sorted_by_length.pop_back();
452 tokens = bytes_remaining_to_allocate;
455 for (
auto iter = operations_in_progress.begin(); iter != operations_in_progress.end();)
457 if ((*iter)->permitted_length > 0)
460 iter = operations_in_progress.erase(iter);
473 for (
auto iter = operations_in_progress.begin();
474 iter != operations_in_progress.end();)
477 iter = operations_in_progress.erase(iter);
482 last_iteration_start_time = this_iteration_start_time;
498 return my->_actual_upload_rate.get_average_rate();
503 return my->_actual_download_rate.get_average_rate();
508 my->_actual_upload_rate.set_time_constant(time_constant);
509 my->_actual_download_rate.set_time_constant(time_constant);
514 my->_upload_bytes_per_second = upload_bytes_per_second;
519 return my->_upload_bytes_per_second;
524 my->_download_bytes_per_second = download_bytes_per_second;
529 return my->_download_bytes_per_second;
void process_pending_operations(time_point &last_iteration_start_time, uint32_t &limit_bytes_per_second, rate_limited_operation_list &operations_in_progress, rate_limited_operation_list &operations_for_next_iteration, uint32_t &tokens, uint32_t &unused_tokens)
virtual void perform_operation() override
auto async(Functor &&f, const char *desc FC_TASK_NAME_DEFAULT_ARG, priority prio=priority()) -> fc::future< decltype(f())>
static ptr create(const char *desc FC_TASK_NAME_DEFAULT_ARG)
future< size_t > write_some(AsyncWriteStream &s, const ConstBufferSequence &buf)
wraps boost::asio::async_write_some
uint32_t _burstiness_in_seconds
rate_limited_operation_list _write_operations_for_next_iteration
void set_io_hooks(tcp_socket_io_hooks *new_hooks)
rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket &socket, const std::shared_ptr< const char > &buffer, size_t length, size_t offset, promise< size_t >::ptr completion_promise)
rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second, uint32_t burstiness_in_seconds=1)
uint32_t _download_bytes_per_second
microseconds milliseconds(int64_t s)
time_point _last_read_iteration_time
std::shared_ptr< const char > shared_buffer
std::list< rate_limited_operation * > rate_limited_operation_list
rate_limited_operation_list _read_operations_in_progress
void add_tcp_socket(tcp_socket *tcp_socket_to_limit)
void update(uint32_t bytes_transferred=0)
promise< size_t >::ptr completion_promise
microseconds _granularity
size_t readsome_impl(boost::asio::ip::tcp::socket &socket, const BufferType &buffer, size_t length, size_t offset)
void set_actual_rate_time_constant(microseconds time_constant)
void process_pending_reads()
void async_read_some(AsyncReadStream &s, const MutableBufferSequence &buf, promise< size_t >::ptr completion_promise)
const T & wait(const microseconds &timeout=microseconds::maximum())
void process_pending_writes()
uint32_t get_average_rate() const
microseconds seconds(int64_t s)
boost::asio::ip::tcp::socket & socket
void cancel_and_wait(const char *reason FC_CANCELATION_REASON_DEFAULT_ARG)
void set_time_constant(const microseconds &time_constant)
void remove_tcp_socket(tcp_socket *tcp_socket_to_stop_limiting)
std::shared_ptr< char > shared_buffer
rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket &socket, char *buffer, size_t length, size_t offset, promise< size_t >::ptr completion_promise)
promise< void >::ptr _new_read_operation_available_promise
future< void > _process_pending_writes_loop_complete
uint32_t get_actual_download_rate() const
void set_download_limit(uint32_t download_bytes_per_second)
boost::asio::ip::tcp::socket & socket
virtual void perform_operation() override
rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket &socket, const char *buffer, size_t length, size_t offset, promise< size_t >::ptr completion_promise)
uint32_t get_download_limit() const
bool operator()(const rate_limited_operation *lhs, const rate_limited_operation *rhs)
uint32_t _unused_write_tokens
rate_limited_operation_list _read_operations_for_next_iteration
future< void > _process_pending_reads_loop_complete
average_rate_meter _actual_download_rate
rate_limited_operation(size_t length, size_t offset, promise< size_t >::ptr &&completion_promise)
Defines exception's used by fc.
rate_limiting_group(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second, uint32_t burstiness_in_seconds=1)
virtual size_t readsome(boost::asio::ip::tcp::socket &socket, char *buffer, size_t length) override
average_rate_meter _actual_upload_rate
uint32_t get_upload_limit() const
future< size_t > read_some(AsyncReadStream &s, const MutableBufferSequence &buf)
void wait(const microseconds &timeout=microseconds::maximum())
void async_write_some(AsyncWriteStream &s, const ConstBufferSequence &buf, promise< size_t >::ptr completion_promise)
wraps boost::asio::async_write_some
time_point _last_write_iteration_time
void set_upload_limit(uint32_t upload_bytes_per_second)
uint32_t _unused_read_tokens
promise< void >::ptr _new_write_operation_available_promise
virtual void perform_operation()=0
virtual size_t writesome(boost::asio::ip::tcp::socket &socket, const char *buffer, size_t length) override
void copy(const path &from, const path &to)
std::shared_ptr< promise< T > > ptr
uint32_t get_actual_upload_rate() const
rate_limited_operation_list _write_operations_in_progress
average_rate_meter(const microseconds &time_constant=seconds(10))
uint32_t _upload_bytes_per_second
rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket &socket, const std::shared_ptr< char > &buffer, size_t length, size_t offset, promise< size_t >::ptr completion_promise)
~rate_limiting_group_impl()
size_t writesome_impl(boost::asio::ip::tcp::socket &socket, const BufferType &buffer, size_t length, size_t offset)