BitShares-Core  7.0.0
BitShares blockchain node software and command-line wallet software
asio.cpp
Go to the documentation of this file.
1 #include <fc/asio.hpp>
2 #include <fc/thread/thread.hpp>
3 #include <boost/thread.hpp>
4 #include <fc/log/logger.hpp>
6 #include <boost/scope_exit.hpp>
7 #include <algorithm>
8 #include <thread>
9 
10 namespace fc {
11  namespace asio {
12  namespace detail {
13 
15  _completion_promise(completion_promise)
16  {
17  // assert(false); // to detect anywhere we're not passing in a shared buffer
18  }
19  void read_write_handler::operator()(const boost::system::error_code& ec, size_t bytes_transferred)
20  {
21  // assert(false); // to detect anywhere we're not passing in a shared buffer
22  if( !ec )
23  _completion_promise->set_value(bytes_transferred);
24  else if( ec == boost::asio::error::eof )
25  _completion_promise->set_exception( fc::exception_ptr( new fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
26  else
27  _completion_promise->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
28  }
30  const std::shared_ptr<const char>& buffer) :
31  _completion_promise(completion_promise),
32  _buffer(buffer)
33  {}
34  void read_write_handler_with_buffer::operator()(const boost::system::error_code& ec, size_t bytes_transferred)
35  {
36  if( !ec )
37  _completion_promise->set_value(bytes_transferred);
38  else if( ec == boost::asio::error::eof )
39  _completion_promise->set_exception( fc::exception_ptr( new fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
40  else
41  _completion_promise->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
42  }
43 
44  void read_write_handler_ec( promise<size_t>* p, boost::system::error_code* oec, const boost::system::error_code& ec, size_t bytes_transferred ) {
45  p->set_value(bytes_transferred);
46  *oec = ec;
47  }
49  const boost::system::error_code& ec ) {
50  if( !ec )
51  p->set_value();
52  else
53  {
54  if( ec == boost::asio::error::eof )
55  {
56  p->set_exception( std::make_shared<fc::eof_exception>(
57  FC_LOG_MESSAGE( error, "${message} ",
58  ("message", boost::system::system_error(ec).what())) ) );
59  }
60  else if( ec == boost::asio::error::operation_aborted )
61  {
62  p->set_exception( std::make_shared<fc::canceled_exception>(
63  FC_LOG_MESSAGE( error, "${message} ",
64  ("message", boost::system::system_error(ec).what())) ) );
65  }
66  else
67  {
68  p->set_exception( std::make_shared<fc::exception>(
69  FC_LOG_MESSAGE( error, "${message} ",
70  ("message", boost::system::system_error(ec).what())) ) );
71  }
72  }
73  }
74 
76  const boost::system::error_code& ec ) {
77  p->set_value(ec);
78  }
79 
80  template<typename EndpointType, typename IteratorType>
82  const typename promise<std::vector<EndpointType> >::ptr& p,
83  const boost::system::error_code& ec,
84  IteratorType itr) {
85  if( !ec ) {
86  std::vector<EndpointType> eps;
87  while( itr != IteratorType() ) {
88  eps.push_back(*itr);
89  ++itr;
90  }
91  p->set_value( eps );
92  } else {
93  p->set_exception(
95  FC_LOG_MESSAGE( error, "process exited with: ${message} ",
96  ("message", boost::system::system_error(ec).what())) ) ) );
97  }
98  }
99  }
100 
102 
103  /***
104  * @brief set the default number of threads for the io service
105  *
106  * Sets the number of threads for the io service. This will throw
107  * an exception if called more than once.
108  *
109  * @param num_threads the number of threads
110  */
111  void default_io_service_scope::set_num_threads(uint16_t num_threads) {
113  num_io_threads = num_threads;
114  }
115 
117 
118  /***
119  * Default constructor
120  */
122  {
123  io = new boost::asio::io_service();
124  the_work = new boost::asio::io_service::work(*io);
125 
126  if( num_io_threads == 0 )
127  {
128  // the default was not set by the configuration. Determine a good
129  // number of threads. Minimum of 8, maximum of hardware_concurrency
130  num_io_threads = std::max( boost::thread::hardware_concurrency(), 8U );
131  }
132 
133  for( uint16_t i = 0; i < num_io_threads; ++i )
134  {
135  asio_threads.push_back( new boost::thread( [i,this]()
136  {
137  fc::thread::current().set_name( "fc::asio worker #" + fc::to_string(i) );
138 
139  BOOST_SCOPE_EXIT(void)
140  {
142  }
143  BOOST_SCOPE_EXIT_END
144 
145  while (!io->stopped())
146  {
147  try
148  {
149  io->run();
150  }
151  catch (const fc::exception& e)
152  {
153  elog("Caught unhandled exception in asio service loop: ${e}", ("e", e));
154  }
155  catch (const std::exception& e)
156  {
157  elog("Caught unhandled exception in asio service loop: ${e}", ("e", e.what()));
158  }
159  catch (...)
160  {
161  elog("Caught unhandled exception in asio service loop");
162  }
163  }
164  }) );
165  } // build thread loop
166  } // end of constructor
167 
168  /***
169  * destructor
170  */
172  {
173  delete the_work;
174  io->stop();
175  for( auto asio_thread : asio_threads )
176  {
177  asio_thread->join();
178  }
179  delete io;
180  for( auto asio_thread : asio_threads )
181  {
182  delete asio_thread;
183  }
184  } // end of destructor
185 
186  /***
187  * @brief create an io_service
188  * @returns the io_service
189  */
190  boost::asio::io_service& default_io_service() {
191  static default_io_service_scope fc_asio_service[1];
192  return *fc_asio_service[0].io;
193  }
194 
195  namespace tcp {
196  std::vector<boost::asio::ip::tcp::endpoint> resolve( const std::string& hostname, const std::string& port)
197  {
198  try
199  {
202  res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port),
203  boost::bind( detail::resolve_handler<boost::asio::ip::tcp::endpoint,resolver_iterator>, p, _1, _2 ) );
204  return p->wait();
205  }
206  FC_RETHROW_EXCEPTIONS(warn, "")
207  }
208  }
209  namespace udp {
210  std::vector<udp::endpoint> resolve( resolver& r, const std::string& hostname, const std::string& port)
211  {
212  try
213  {
215  promise<std::vector<endpoint> >::ptr p = promise<std::vector<endpoint> >::create("udp::resolve completion");
216  res.async_resolve( resolver::query(hostname,port),
217  boost::bind( detail::resolve_handler<endpoint,resolver_iterator>, p, _1, _2 ) );
218  return p->wait();
219  }
220  FC_RETHROW_EXCEPTIONS(warn, "")
221  }
222  }
223 
224 } } // namespace fc::asio
fc::promise
Definition: future.hpp:109
fc::thread::current
static thread & current()
Definition: thread.cpp:125
fc::asio::detail::read_write_handler_with_buffer::read_write_handler_with_buffer
read_write_handler_with_buffer(const promise< size_t >::ptr &p, const std::shared_ptr< const char > &buffer)
Definition: asio.cpp:29
fc::asio::detail::read_write_handler_with_buffer::operator()
void operator()(const boost::system::error_code &ec, size_t bytes_transferred)
Definition: asio.cpp:34
fc::promise::ptr
std::shared_ptr< promise< T > > ptr
Definition: future.hpp:111
fc::thread::set_name
void set_name(const string &n)
associates a name with this thread.
Definition: thread.cpp:143
fc::asio::detail::resolve_handler
void resolve_handler(const typename promise< std::vector< EndpointType > >::ptr &p, const boost::system::error_code &ec, IteratorType itr)
Definition: asio.cpp:81
fc::exception
Used to generate a useful error report when an exception is thrown.
Definition: exception.hpp:56
fc::to_string
std::string to_string(double)
Definition: string.cpp:73
fc::promise::wait
const T & wait(const microseconds &timeout=microseconds::maximum())
Definition: future.hpp:127
fc
Definition: api.hpp:15
fc::asio::detail::error_handler_ec
void error_handler_ec(promise< boost::system::error_code > *p, const boost::system::error_code &ec)
Definition: asio.cpp:75
fc::asio::tcp::resolver
boost::asio::ip::tcp::resolver resolver
Definition: asio.hpp:241
fc::asio::default_io_service_scope
Definition: asio.hpp:74
fc::exception_ptr
std::shared_ptr< exception > exception_ptr
Definition: exception.hpp:131
fc::asio::default_io_service_scope::~default_io_service_scope
~default_io_service_scope()
Definition: asio.cpp:171
asio.hpp
fc::asio::default_io_service_scope::get_num_threads
static uint16_t get_num_threads()
Definition: asio.cpp:116
thread.hpp
fc::asio::default_io_service_scope::default_io_service_scope
default_io_service_scope()
Definition: asio.cpp:121
fc::asio::detail::read_write_handler_ec
void read_write_handler_ec(promise< size_t > *p, boost::system::error_code *oec, const boost::system::error_code &ec, size_t bytes_transferred)
Definition: asio.cpp:44
fc::promise::set_value
void set_value(const T &v)
Definition: future.hpp:136
FC_ASSERT
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
Definition: exception.hpp:345
exception.hpp
Defines exception's used by fc.
fc::asio::detail::read_write_handler::read_write_handler
read_write_handler(const promise< size_t >::ptr &p)
Definition: asio.cpp:14
fc::asio::default_io_service_scope::num_io_threads
static uint16_t num_io_threads
Definition: asio.hpp:89
fc::asio::detail::error_handler
void error_handler(const promise< void >::ptr &p, const boost::system::error_code &ec)
Definition: asio.cpp:48
fc::promise_base::set_exception
void set_exception(const fc::exception_ptr &e)
Definition: future.cpp:44
FC_RETHROW_EXCEPTIONS
#define FC_RETHROW_EXCEPTIONS(LOG_LEVEL, FORMAT,...)
Catchs all exception's, std::exceptions, and ... and rethrows them after appending the provided log m...
Definition: exception.hpp:464
logger.hpp
fc::asio::detail::read_write_handler::operator()
void operator()(const boost::system::error_code &ec, size_t bytes_transferred)
Definition: asio.cpp:19
fc::asio::default_io_service
boost::asio::io_service & default_io_service()
Definition: asio.cpp:190
fc::thread::cleanup
static void cleanup()
Definition: thread.cpp:131
fc::asio::udp::resolve
std::vector< endpoint > resolve(resolver &r, const std::string &hostname, const std::string &port)
resolve all udp::endpoints for hostname:port
Definition: asio.cpp:210
fc::asio::default_io_service_scope::set_num_threads
static void set_num_threads(uint16_t num_threads)
Definition: asio.cpp:111
fc::asio::default_io_service_scope::io
boost::asio::io_service * io
Definition: asio.hpp:84
fc::asio::tcp::resolve
std::vector< endpoint > resolve(const std::string &hostname, const std::string &port)
Definition: asio.cpp:196
fc::asio::udp::resolver
boost::asio::ip::udp::resolver resolver
Definition: asio.hpp:272
FC_LOG_MESSAGE
#define FC_LOG_MESSAGE(LOG_LEVEL, FORMAT,...)
A helper method for generating log messages.
Definition: log_message.hpp:163
elog
#define elog(FORMAT,...)
Definition: logger.hpp:129