BitShares-Core  6.1.0
BitShares blockchain implementation and command-line interface software
asio.cpp
Go to the documentation of this file.
1 #include <fc/asio.hpp>
2 #include <fc/thread/thread.hpp>
3 #include <boost/thread.hpp>
4 #include <fc/log/logger.hpp>
6 #include <boost/scope_exit.hpp>
7 #include <algorithm>
8 #include <thread>
9 
10 namespace fc {
11  namespace asio {
12  namespace detail {
13 
15  _completion_promise(completion_promise)
16  {
17  // assert(false); // to detect anywhere we're not passing in a shared buffer
18  }
19  void read_write_handler::operator()(const boost::system::error_code& ec, size_t bytes_transferred)
20  {
21  // assert(false); // to detect anywhere we're not passing in a shared buffer
22  if( !ec )
23  _completion_promise->set_value(bytes_transferred);
24  else if( ec == boost::asio::error::eof )
25  _completion_promise->set_exception( fc::exception_ptr( new fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
26  else
27  _completion_promise->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
28  }
30  const std::shared_ptr<const char>& buffer) :
31  _completion_promise(completion_promise),
32  _buffer(buffer)
33  {}
34  void read_write_handler_with_buffer::operator()(const boost::system::error_code& ec, size_t bytes_transferred)
35  {
36  if( !ec )
37  _completion_promise->set_value(bytes_transferred);
38  else if( ec == boost::asio::error::eof )
39  _completion_promise->set_exception( fc::exception_ptr( new fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
40  else
41  _completion_promise->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
42  }
43 
44  void read_write_handler_ec( promise<size_t>* p, boost::system::error_code* oec, const boost::system::error_code& ec, size_t bytes_transferred ) {
45  p->set_value(bytes_transferred);
46  *oec = ec;
47  }
49  const boost::system::error_code& ec ) {
50  if( !ec )
51  p->set_value();
52  else
53  {
54  if( ec == boost::asio::error::eof )
55  {
56  p->set_exception( fc::exception_ptr( new fc::eof_exception(
57  FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
58  }
59  else
60  {
62  FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
63  }
64  }
65  }
66 
68  const boost::system::error_code& ec ) {
69  p->set_value(ec);
70  }
71 
72  template<typename EndpointType, typename IteratorType>
74  const typename promise<std::vector<EndpointType> >::ptr& p,
75  const boost::system::error_code& ec,
76  IteratorType itr) {
77  if( !ec ) {
78  std::vector<EndpointType> eps;
79  while( itr != IteratorType() ) {
80  eps.push_back(*itr);
81  ++itr;
82  }
83  p->set_value( eps );
84  } else {
85  p->set_exception(
87  FC_LOG_MESSAGE( error, "process exited with: ${message} ",
88  ("message", boost::system::system_error(ec).what())) ) ) );
89  }
90  }
91  }
92 
94 
95  /***
96  * @brief set the default number of threads for the io service
97  *
98  * Sets the number of threads for the io service. This will throw
99  * an exception if called more than once.
100  *
101  * @param num_threads the number of threads
102  */
103  void default_io_service_scope::set_num_threads(uint16_t num_threads) {
104  FC_ASSERT(num_io_threads == 0);
105  num_io_threads = num_threads;
106  }
107 
108  uint16_t default_io_service_scope::get_num_threads() { return num_io_threads; }
109 
110  /***
111  * Default constructor
112  */
114  {
115  io = new boost::asio::io_service();
116  the_work = new boost::asio::io_service::work(*io);
117 
118  if( num_io_threads == 0 )
119  {
120  // the default was not set by the configuration. Determine a good
121  // number of threads. Minimum of 8, maximum of hardware_concurrency
122  num_io_threads = std::max( boost::thread::hardware_concurrency(), 8U );
123  }
124 
125  for( uint16_t i = 0; i < num_io_threads; ++i )
126  {
127  asio_threads.push_back( new boost::thread( [i,this]()
128  {
129  fc::thread::current().set_name( "fc::asio worker #" + fc::to_string(i) );
130 
131  BOOST_SCOPE_EXIT(void)
132  {
134  }
135  BOOST_SCOPE_EXIT_END
136 
137  while (!io->stopped())
138  {
139  try
140  {
141  io->run();
142  }
143  catch (const fc::exception& e)
144  {
145  elog("Caught unhandled exception in asio service loop: ${e}", ("e", e));
146  }
147  catch (const std::exception& e)
148  {
149  elog("Caught unhandled exception in asio service loop: ${e}", ("e", e.what()));
150  }
151  catch (...)
152  {
153  elog("Caught unhandled exception in asio service loop");
154  }
155  }
156  }) );
157  } // build thread loop
158  } // end of constructor
159 
160  /***
161  * destructor
162  */
164  {
165  delete the_work;
166  io->stop();
167  for( auto asio_thread : asio_threads )
168  {
169  asio_thread->join();
170  }
171  delete io;
172  for( auto asio_thread : asio_threads )
173  {
174  delete asio_thread;
175  }
176  } // end of destructor
177 
178  /***
179  * @brief create an io_service
180  * @returns the io_service
181  */
182  boost::asio::io_service& default_io_service() {
183  static default_io_service_scope fc_asio_service[1];
184  return *fc_asio_service[0].io;
185  }
186 
187  namespace tcp {
188  std::vector<boost::asio::ip::tcp::endpoint> resolve( const std::string& hostname, const std::string& port)
189  {
190  try
191  {
194  res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port),
195  boost::bind( detail::resolve_handler<boost::asio::ip::tcp::endpoint,resolver_iterator>, p, _1, _2 ) );
196  return p->wait();
197  }
198  FC_RETHROW_EXCEPTIONS(warn, "")
199  }
200  }
201  namespace udp {
202  std::vector<udp::endpoint> resolve( resolver& r, const std::string& hostname, const std::string& port)
203  {
204  try
205  {
207  promise<std::vector<endpoint> >::ptr p = promise<std::vector<endpoint> >::create("udp::resolve completion");
208  res.async_resolve( resolver::query(hostname,port),
209  boost::bind( detail::resolve_handler<endpoint,resolver_iterator>, p, _1, _2 ) );
210  return p->wait();
211  }
212  FC_RETHROW_EXCEPTIONS(warn, "")
213  }
214  }
215 
216 } } // namespace fc::asio
static void cleanup()
Definition: thread.cpp:131
void read_write_handler_ec(promise< size_t > *p, boost::system::error_code *oec, const boost::system::error_code &ec, size_t bytes_transferred)
Definition: asio.cpp:44
boost::asio::ip::tcp::resolver resolver
Definition: asio.hpp:241
void error_handler(const promise< void >::ptr &p, const boost::system::error_code &ec)
Definition: asio.cpp:48
void operator()(const boost::system::error_code &ec, size_t bytes_transferred)
Definition: asio.cpp:19
void set_exception(const fc::exception_ptr &e)
Definition: future.cpp:44
#define elog(FORMAT,...)
Definition: logger.hpp:129
Used to generate a useful error report when an exception is thrown.At each level in the stack where t...
Definition: exception.hpp:56
void set_name(const string &n)
associates a name with this thread.
Definition: thread.cpp:143
#define FC_RETHROW_EXCEPTIONS(LOG_LEVEL, FORMAT,...)
Catchs all exception&#39;s, std::exceptions, and ... and rethrows them after appending the provided log m...
Definition: exception.hpp:464
std::shared_ptr< exception > exception_ptr
Definition: exception.hpp:131
const T & wait(const microseconds &timeout=microseconds::maximum())
Definition: future.hpp:127
static thread & current()
Definition: thread.cpp:125
void error_handler_ec(promise< boost::system::error_code > *p, const boost::system::error_code &ec)
Definition: asio.cpp:67
void set_value(const T &v)
Definition: future.hpp:136
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
Definition: exception.hpp:345
static uint16_t get_num_threads()
Definition: asio.cpp:108
boost::asio::ip::udp::resolver resolver
Definition: asio.hpp:272
read_write_handler(const promise< size_t >::ptr &p)
Definition: asio.cpp:14
Defines exception&#39;s used by fc.
void resolve_handler(const typename promise< std::vector< EndpointType > >::ptr &p, const boost::system::error_code &ec, IteratorType itr)
Definition: asio.cpp:73
boost::asio::io_service * io
Definition: asio.hpp:81
std::string to_string(double)
Definition: string.cpp:73
boost::asio::io_service & default_io_service()
Definition: asio.cpp:182
Definition: api.hpp:15
std::vector< fc::ip::endpoint > resolve(const std::string &host, uint16_t port)
Definition: resolve.cpp:7
read_write_handler_with_buffer(const promise< size_t >::ptr &p, const std::shared_ptr< const char > &buffer)
Definition: asio.cpp:29
static uint16_t num_io_threads
Definition: asio.hpp:86
#define FC_LOG_MESSAGE(LOG_LEVEL, FORMAT,...)
A helper method for generating log messages.
void operator()(const boost::system::error_code &ec, size_t bytes_transferred)
Definition: asio.cpp:34
std::shared_ptr< promise< T > > ptr
Definition: future.hpp:111
static void set_num_threads(uint16_t num_threads)
Definition: asio.cpp:103