BitShares-Core  4.0.0
BitShares blockchain implementation and command-line interface software
asio.hpp
Go to the documentation of this file.
1 
5 #pragma once
6 #include <boost/asio.hpp>
7 #include <boost/bind.hpp>
8 #include <boost/thread.hpp>
9 #include <vector>
10 #include <fc/thread/future.hpp>
11 #include <fc/io/iostream.hpp>
12 
13 namespace fc {
17 namespace asio {
21  namespace detail {
22 
24  {
25  public:
27  void operator()(const boost::system::error_code& ec, size_t bytes_transferred);
28  private:
29  promise<size_t>::ptr _completion_promise;
30  };
31 
33  {
34  public:
36  const std::shared_ptr<const char>& buffer);
37  void operator()(const boost::system::error_code& ec, size_t bytes_transferred);
38  private:
39  promise<size_t>::ptr _completion_promise;
40  std::shared_ptr<const char> _buffer;
41  };
42 
43  //void read_write_handler( const promise<size_t>::ptr& p,
44  // const boost::system::error_code& ec,
45  // size_t bytes_transferred );
47  boost::system::error_code* oec,
48  const boost::system::error_code& ec,
49  size_t bytes_transferred );
50  void error_handler( const promise<void>::ptr& p,
51  const boost::system::error_code& ec );
53  const boost::system::error_code& ec );
54 
55  template<typename C>
56  struct non_blocking {
57  bool operator()( C& c ) { return c.non_blocking(); }
58  bool operator()( C& c, bool s ) { c.non_blocking(s); return true; }
59  };
60 
61 #if WIN32 // windows stream handles do not support non blocking!
62  template<>
63  struct non_blocking<boost::asio::windows::stream_handle> {
64  typedef boost::asio::windows::stream_handle C;
65  bool operator()( C& ) { return false; }
66  bool operator()( C&, bool ) { return false; }
67  };
68 #endif
69  } // end of namespace detail
70 
71  /***
72  * A structure for holding the boost io service and associated
73  * threads
74  */
76  {
77  public:
80  static void set_num_threads(uint16_t num_threads);
81  static uint16_t get_num_threads();
82  boost::asio::io_service* io;
83  private:
84  std::vector<boost::thread*> asio_threads;
85  boost::asio::io_service::work* the_work;
86  protected:
87  static uint16_t num_io_threads; // marked protected to help with testing
88  };
89 
96  boost::asio::io_service& default_io_service();
97 
103  template<typename AsyncReadStream, typename MutableBufferSequence>
104  size_t read( AsyncReadStream& s, const MutableBufferSequence& buf ) {
105  promise<size_t>::ptr p = promise<size_t>::create("fc::asio::read");
106  boost::asio::async_read( s, buf, detail::read_write_handler(p) );
107  return p->wait();
108  }
122  template<typename AsyncReadStream, typename MutableBufferSequence>
123  future<size_t> read_some(AsyncReadStream& s, const MutableBufferSequence& buf)
124  {
125  promise<size_t>::ptr completion_promise = promise<size_t>::create("fc::asio::async_read_some");
126  s.async_read_some(buf, detail::read_write_handler(completion_promise));
127  return completion_promise;//->wait();
128  }
129 
130  template<typename AsyncReadStream>
131  future<size_t> read_some(AsyncReadStream& s, char* buffer, size_t length, size_t offset = 0)
132  {
133  promise<size_t>::ptr completion_promise = promise<size_t>::create("fc::asio::async_read_some");
134  s.async_read_some(boost::asio::buffer(buffer + offset, length),
135  detail::read_write_handler(completion_promise));
136  return completion_promise;//->wait();
137  }
138 
139  template<typename AsyncReadStream>
140  future<size_t> read_some(AsyncReadStream& s, const std::shared_ptr<char>& buffer, size_t length, size_t offset)
141  {
142  promise<size_t>::ptr completion_promise = promise<size_t>::create("fc::asio::async_read_some");
143  s.async_read_some(boost::asio::buffer(buffer.get() + offset, length),
144  detail::read_write_handler_with_buffer(completion_promise, buffer));
145  return completion_promise;//->wait();
146  }
147 
148  template<typename AsyncReadStream, typename MutableBufferSequence>
149  void async_read_some(AsyncReadStream& s, const MutableBufferSequence& buf, promise<size_t>::ptr completion_promise)
150  {
151  s.async_read_some(buf, detail::read_write_handler(completion_promise));
152  }
153 
154  template<typename AsyncReadStream>
155  void async_read_some(AsyncReadStream& s, char* buffer,
156  size_t length, promise<size_t>::ptr completion_promise)
157  {
158  s.async_read_some(boost::asio::buffer(buffer, length), detail::read_write_handler(completion_promise));
159  }
160 
161  template<typename AsyncReadStream>
162  void async_read_some(AsyncReadStream& s, const std::shared_ptr<char>& buffer,
163  size_t length, size_t offset, promise<size_t>::ptr completion_promise)
164  {
165  s.async_read_some(boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(completion_promise, buffer));
166  }
167 
168  template<typename AsyncReadStream>
169  size_t read_some( AsyncReadStream& s, boost::asio::streambuf& buf )
170  {
171  char buffer[1024];
172  size_t bytes_read = read_some( s, boost::asio::buffer( buffer, sizeof(buffer) ) );
173  buf.sputn( buffer, bytes_read );
174  return bytes_read;
175  }
176 
180  template<typename AsyncWriteStream, typename ConstBufferSequence>
181  size_t write( AsyncWriteStream& s, const ConstBufferSequence& buf ) {
182  promise<size_t>::ptr p = promise<size_t>::create("fc::asio::write");
183  boost::asio::async_write(s, buf, detail::read_write_handler(p));
184  return p->wait();
185  }
186 
192  template<typename AsyncWriteStream, typename ConstBufferSequence>
193  future<size_t> write_some( AsyncWriteStream& s, const ConstBufferSequence& buf ) {
194  promise<size_t>::ptr p = promise<size_t>::create("fc::asio::write_some");
195  s.async_write_some( buf, detail::read_write_handler(p));
196  return p; //->wait();
197  }
198 
199  template<typename AsyncWriteStream>
200  future<size_t> write_some( AsyncWriteStream& s, const char* buffer,
201  size_t length, size_t offset = 0) {
202  promise<size_t>::ptr p = promise<size_t>::create("fc::asio::write_some");
203  s.async_write_some( boost::asio::buffer(buffer + offset, length), detail::read_write_handler(p));
204  return p; //->wait();
205  }
206 
207  template<typename AsyncWriteStream>
208  future<size_t> write_some( AsyncWriteStream& s, const std::shared_ptr<const char>& buffer,
209  size_t length, size_t offset ) {
210  promise<size_t>::ptr p = promise<size_t>::create("fc::asio::write_some");
211  s.async_write_some( boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(p, buffer));
212  return p; //->wait();
213  }
214 
220  template<typename AsyncWriteStream, typename ConstBufferSequence>
221  void async_write_some(AsyncWriteStream& s, const ConstBufferSequence& buf, promise<size_t>::ptr completion_promise) {
222  s.async_write_some(buf, detail::read_write_handler(completion_promise));
223  }
224 
225  template<typename AsyncWriteStream>
226  void async_write_some(AsyncWriteStream& s, const char* buffer,
227  size_t length, promise<size_t>::ptr completion_promise) {
228  s.async_write_some(boost::asio::buffer(buffer, length),
229  detail::read_write_handler(completion_promise));
230  }
231 
232  template<typename AsyncWriteStream>
233  void async_write_some(AsyncWriteStream& s, const std::shared_ptr<const char>& buffer,
234  size_t length, size_t offset, promise<size_t>::ptr completion_promise) {
235  s.async_write_some(boost::asio::buffer(buffer.get() + offset, length),
236  detail::read_write_handler_with_buffer(completion_promise, buffer));
237  }
238 
239  namespace tcp {
241  typedef boost::asio::ip::tcp::resolver::iterator resolver_iterator;
243  std::vector<endpoint> resolve( const std::string& hostname, const std::string& port );
244 
250  template<typename SocketType, typename AcceptorType>
251  void accept( AcceptorType& acc, SocketType& sock ) {
252  promise<void>::ptr p = promise<void>::create("fc::asio::tcp::accept");
253  acc.async_accept( sock, boost::bind( fc::asio::detail::error_handler, p, _1 ) );
254  p->wait();
255  //if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) );
256  }
257 
262  template<typename AsyncSocket, typename EndpointType>
263  void connect( AsyncSocket& sock, const EndpointType& ep ) {
264  promise<void>::ptr p = promise<void>::create("fc::asio::tcp::connect");
265  sock.async_connect( ep, boost::bind( fc::asio::detail::error_handler, p, _1 ) );
266  p->wait();
267  //if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) );
268  }
269  }
270  namespace udp {
272  typedef boost::asio::ip::udp::resolver::iterator resolver_iterator;
275  std::vector<endpoint> resolve( resolver& r, const std::string& hostname,
276  const std::string& port );
277  }
278 
279  template<typename AsyncReadStream>
280  class istream : public virtual fc::istream
281  {
282  public:
283  istream( std::shared_ptr<AsyncReadStream> str )
284  :_stream( std::move(str) ){}
285 
286  virtual size_t readsome( char* buf, size_t len )
287  {
288  return fc::asio::read_some(*_stream, buf, len).wait();
289  }
290  virtual size_t readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset )
291  {
292  return fc::asio::read_some(*_stream, buf, len, offset).wait();
293  }
294 
295  private:
296  std::shared_ptr<AsyncReadStream> _stream;
297  };
298 
299  template<typename AsyncWriteStream>
300  class ostream : public virtual fc::ostream
301  {
302  public:
303  ostream( std::shared_ptr<AsyncWriteStream> str )
304  :_stream( std::move(str) ){}
305 
306  virtual size_t writesome( const char* buf, size_t len )
307  {
308  return fc::asio::write_some(*_stream, buf, len).wait();
309  }
310 
311  virtual size_t writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset )
312  {
313  return fc::asio::write_some(*_stream, buf, len, offset).wait();
314  }
315 
316  virtual void close(){ _stream->close(); }
317  virtual void flush() {}
318  private:
319  std::shared_ptr<AsyncWriteStream> _stream;
320  };
321 
322 
323 } } // namespace fc::asio
324 
static ptr create(const char *desc FC_TASK_NAME_DEFAULT_ARG)
Definition: future.hpp:114
void read_write_handler_ec(promise< size_t > *p, boost::system::error_code *oec, const boost::system::error_code &ec, size_t bytes_transferred)
Definition: asio.cpp:44
future< size_t > write_some(AsyncWriteStream &s, const ConstBufferSequence &buf)
wraps boost::asio::async_write_some
Definition: asio.hpp:193
boost::asio::ip::udp::endpoint endpoint
Definition: asio.hpp:271
void error_handler(const promise< void >::ptr &p, const boost::system::error_code &ec)
Definition: asio.cpp:48
void operator()(const boost::system::error_code &ec, size_t bytes_transferred)
Definition: asio.cpp:19
virtual size_t readsome(const std::shared_ptr< char > &buf, size_t len, size_t offset)
Definition: asio.hpp:290
virtual size_t writesome(const char *buf, size_t len)
Definition: asio.hpp:306
istream(std::shared_ptr< AsyncReadStream > str)
Definition: asio.hpp:283
size_t read(AsyncReadStream &s, const MutableBufferSequence &buf)
wraps boost::asio::async_read
Definition: asio.hpp:104
void connect(AsyncSocket &sock, const EndpointType &ep)
wraps boost::asio::socket::async_connect
Definition: asio.hpp:263
void accept(AcceptorType &acc, SocketType &sock)
wraps boost::asio::async_accept
Definition: asio.hpp:251
void async_read_some(AsyncReadStream &s, const MutableBufferSequence &buf, promise< size_t >::ptr completion_promise)
Definition: asio.hpp:149
const T & wait(const microseconds &timeout=microseconds::maximum())
Definition: future.hpp:127
virtual size_t readsome(char *buf, size_t len)
Definition: asio.hpp:286
void error_handler_ec(promise< boost::system::error_code > *p, const boost::system::error_code &ec)
Definition: asio.cpp:67
boost::asio::ip::udp::resolver resolver
Definition: asio.hpp:273
read_write_handler(const promise< size_t >::ptr &p)
Definition: asio.cpp:14
virtual void close()
Definition: asio.hpp:316
future< size_t > read_some(AsyncReadStream &s, const MutableBufferSequence &buf)
Definition: asio.hpp:123
boost::asio::io_service * io
Definition: asio.hpp:82
void async_write_some(AsyncWriteStream &s, const ConstBufferSequence &buf, promise< size_t >::ptr completion_promise)
wraps boost::asio::async_write_some
Definition: asio.hpp:221
ostream(std::shared_ptr< AsyncWriteStream > str)
Definition: asio.hpp:303
const T & wait(const microseconds &timeout=microseconds::maximum()) const
Definition: future.hpp:228
boost::asio::io_service & default_io_service()
Definition: asio.cpp:182
Definition: api.hpp:15
size_t write(AsyncWriteStream &s, const ConstBufferSequence &buf)
wraps boost::asio::async_write
Definition: asio.hpp:181
std::vector< fc::ip::endpoint > resolve(const std::string &host, uint16_t port)
Definition: resolve.cpp:7
static uint16_t num_io_threads
Definition: asio.hpp:87
virtual void flush()
Definition: asio.hpp:317
std::shared_ptr< promise< T > > ptr
Definition: future.hpp:111
virtual size_t writesome(const std::shared_ptr< const char > &buf, size_t len, size_t offset)
Definition: asio.hpp:311
boost::asio::ip::udp::resolver::iterator resolver_iterator
Definition: asio.hpp:272
bool operator()(C &c, bool s)
Definition: asio.hpp:58