BitShares-Core  5.0.0
BitShares blockchain implementation and command-line interface software
tcp_socket.cpp
Go to the documentation of this file.
2 #include <fc/network/ip.hpp>
4 #include <fc/fwd_impl.hpp>
5 #include <fc/asio.hpp>
6 #include <fc/log/logger.hpp>
7 #include <fc/io/stdio.hpp>
9 
10 #if defined _WIN32 || defined WIN32 || defined OS_WIN64 || defined _WIN64 || defined WIN64 || defined WINNT
11 # include <mstcpip.h>
12 #endif
13 
14 #if defined __OpenBSD__
15 # include <sys/types.h>
16 # include <sys/sysctl.h>
17 # include <netinet/tcp_timer.h>
18 # include <netinet/tcp_var.h>
19 #endif
20 
21 namespace fc {
22 
23  namespace detail
24  {
25  bool have_so_reuseport = true;
26  }
27 
29  public:
30  impl() :
31  _sock(fc::asio::default_io_service()),
32  _io_hooks(this)
33  {}
35  {
36  if( _sock.is_open() )
37  try
38  {
39  _sock.close();
40  }
41  catch( ... )
42  {}
43  if( _read_in_progress.valid() )
44  try
45  {
46  _read_in_progress.wait();
47  }
48  catch ( ... )
49  {
50  }
51  if( _write_in_progress.valid() )
52  try
53  {
54  _write_in_progress.wait();
55  }
56  catch ( ... )
57  {
58  }
59  }
60  virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) override;
61  virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length, size_t offset) override;
62  virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) override;
63  virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length, size_t offset) override;
64 
67  boost::asio::ip::tcp::socket _sock;
69  };
70 
71  size_t tcp_socket::impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length)
72  {
73  return (_read_in_progress = fc::asio::read_some(socket, buffer, length)).wait();
74  }
75  size_t tcp_socket::impl::readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length, size_t offset)
76  {
77  return (_read_in_progress = fc::asio::read_some(socket, buffer, length, offset)).wait();
78  }
79  size_t tcp_socket::impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length)
80  {
81  return (_write_in_progress = fc::asio::write_some(socket, buffer, length)).wait();
82  }
83  size_t tcp_socket::impl::writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length, size_t offset)
84  {
85  return (_write_in_progress = fc::asio::write_some(socket, buffer, length, offset)).wait();
86  }
87 
88 
90  {
91  my->_sock.open(boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0).protocol());
92  }
93 
94  bool tcp_socket::is_open()const {
95  return my->_sock.is_open();
96  }
97 
99 
101 
104  try {
105  if( is_open() )
106  {
107  my->_sock.close();
108  }
109  } FC_RETHROW_EXCEPTIONS( warn, "error closing tcp socket" );
110  }
111 
112  bool tcp_socket::eof()const {
113  return !my->_sock.is_open();
114  }
115 
116  size_t tcp_socket::writesome(const char* buf, size_t len)
117  {
118  return my->_io_hooks->writesome(my->_sock, buf, len);
119  }
120 
121  size_t tcp_socket::writesome(const std::shared_ptr<const char>& buf, size_t len, size_t offset)
122  {
123  return my->_io_hooks->writesome(my->_sock, buf, len, offset);
124  }
125 
127  {
128  try
129  {
130  auto rep = my->_sock.remote_endpoint();
131  return fc::ip::endpoint(rep.address().to_v4().to_ulong(), rep.port() );
132  }
133  FC_RETHROW_EXCEPTIONS( warn, "error getting socket's remote endpoint" );
134  }
135 
136 
138  {
139  try
140  {
141  auto boost_local_endpoint = my->_sock.local_endpoint();
142  return fc::ip::endpoint(boost_local_endpoint.address().to_v4().to_ulong(), boost_local_endpoint.port() );
143  }
144  FC_RETHROW_EXCEPTIONS( warn, "error getting socket's local endpoint" );
145  }
146 
147  size_t tcp_socket::readsome( char* buf, size_t len )
148  {
149  return my->_io_hooks->readsome(my->_sock, buf, len);
150  }
151 
152  size_t tcp_socket::readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset ) {
153  return my->_io_hooks->readsome(my->_sock, buf, len, offset);
154  }
155 
156  void tcp_socket::connect_to( const fc::ip::endpoint& remote_endpoint ) {
157  fc::asio::tcp::connect(my->_sock, fc::asio::tcp::endpoint( boost::asio::ip::address_v4(remote_endpoint.get_address()), remote_endpoint.port() ) );
158  }
159 
160  void tcp_socket::bind(const fc::ip::endpoint& local_endpoint)
161  {
162  try
163  {
164  my->_sock.bind(boost::asio::ip::tcp::endpoint(boost::asio::ip::address_v4(local_endpoint.get_address()),
165  local_endpoint.port()));
166  }
167  catch (const std::exception& except)
168  {
169  elog("Exception binding outgoing connection to desired local endpoint ${endpoint}: ${what}", ("endpoint", local_endpoint)("what", except.what()));
170  FC_THROW("error binding to ${endpoint}: ${what}", ("endpoint", local_endpoint)("what", except.what()));
171  }
172  }
173 
175  {
176  if (interval.count())
177  {
178  boost::asio::socket_base::keep_alive option(true);
179  my->_sock.set_option(option);
180 #if defined _WIN32 || defined WIN32 || defined OS_WIN64 || defined _WIN64 || defined WIN64 || defined WINNT
181  struct tcp_keepalive keepalive_settings;
182  keepalive_settings.onoff = 1;
183  keepalive_settings.keepalivetime = (ULONG)(interval.count() / fc::milliseconds(1).count());
184  keepalive_settings.keepaliveinterval = (ULONG)(interval.count() / fc::milliseconds(1).count());
185 
186  DWORD dwBytesRet = 0;
187  if (WSAIoctl(my->_sock.native_handle(), SIO_KEEPALIVE_VALS, &keepalive_settings, sizeof(keepalive_settings),
188  NULL, 0, &dwBytesRet, NULL, NULL) == SOCKET_ERROR)
189  wlog("Error setting TCP keepalive values");
190 #elif !defined(__clang__) || (__clang_major__ >= 6)
191  // This should work for modern Linuxes and for OSX >= Mountain Lion
192  int timeout_sec = interval.count() / fc::seconds(1).count();
193  if (setsockopt(my->_sock.native_handle(), IPPROTO_TCP,
194  #if defined( __APPLE__ )
195  TCP_KEEPALIVE,
196  #elif defined( __OpenBSD__ )
197  SO_KEEPALIVE,
198  #else
199  TCP_KEEPIDLE,
200  #endif
201  (char*)&timeout_sec, sizeof(timeout_sec)) < 0)
202  wlog("Error setting TCP keepalive idle time");
203 # if defined(__OpenBSD__)
204  int name[4];
205  name[0] = CTL_NET;
206  name[1] = PF_INET;
207  name[2] = IPPROTO_TCP;
208 
209  int value;
210  size_t sz;
211 
212  // get tics per second
213  name[3] = TCPCTL_SLOWHZ;
214  if (sysctl(name, 4, &value, &sz, NULL, 0) == -1)
215  wlog("Error setting TCP keepalive interval");
216 
217  // set interval
218  value *= timeout_sec;
219  name[3] = TCPCTL_KEEPINTVL;
220  if (sysctl(name, 4, NULL, NULL, &value, sizeof(value)) == -1)
221  wlog("Error setting TCP keepalive interval");
222 # elif !defined(__APPLE__) || defined(TCP_KEEPINTVL) // TCP_KEEPINTVL not defined before 10.9
223  if (setsockopt(my->_sock.native_handle(), IPPROTO_TCP, TCP_KEEPINTVL,
224  (char*)&timeout_sec, sizeof(timeout_sec)) < 0)
225  wlog("Error setting TCP keepalive interval");
226 # endif // (__OpenBSD__) or (!__APPLE__ || TCP_KEEPINTVL)
227 #endif // !WIN32
228  }
229  else
230  {
231  boost::asio::socket_base::keep_alive option(false);
232  my->_sock.set_option(option);
233  }
234  }
235 
237  {
238  my->_io_hooks = new_hooks ? new_hooks : &*my;
239  }
240 
241  void tcp_socket::set_reuse_address(bool enable /* = true */)
242  {
243  FC_ASSERT(my->_sock.is_open());
244  boost::asio::socket_base::reuse_address option(enable);
245  my->_sock.set_option(option);
246 #if defined(__APPLE__) || defined(__linux__)
247 # ifndef SO_REUSEPORT
248 # define SO_REUSEPORT 15
249 # endif
250  // OSX needs SO_REUSEPORT in addition to SO_REUSEADDR.
251  // This probably needs to be set for any BSD
253  {
254  int reuseport_value = 1;
255  if (setsockopt(my->_sock.native_handle(), SOL_SOCKET, SO_REUSEPORT,
256  (char*)&reuseport_value, sizeof(reuseport_value)) < 0)
257  {
258  if (errno == ENOPROTOOPT)
260  else
261  wlog("Error setting SO_REUSEPORT");
262  }
263  }
264 #endif // __APPLE__
265  }
266 
267 
269  public:
271  :_accept( fc::asio::default_io_service() )
272  {
273  _accept.open(boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0).protocol());
274  }
275 
276  ~impl(){
277  try {
278  _accept.close();
279  }
280  catch ( boost::system::system_error& )
281  {
282  wlog( "unexpected exception ${e}", ("e", fc::except_str()) );
283  }
284  }
285 
286  boost::asio::ip::tcp::acceptor _accept;
287  };
289  if( my && my->_accept.is_open() )
290  my->_accept.close();
291  delete my;
292  my = nullptr;
293  }
295  :my(nullptr) {
296  }
298  delete my;
299  }
300 
301 
303  {
304  try
305  {
306  FC_ASSERT( my != nullptr );
307  fc::asio::tcp::accept( my->_accept, s.my->_sock );
308  } FC_RETHROW_EXCEPTIONS( warn, "Unable to accept connection on socket." );
309  }
310  void tcp_server::set_reuse_address(bool enable /* = true */)
311  {
312  if( !my )
313  my = new impl;
314  boost::asio::ip::tcp::acceptor::reuse_address option(enable);
315  my->_accept.set_option(option);
316 #if defined(__APPLE__) || (defined(__linux__) && defined(SO_REUSEPORT))
317  // OSX needs SO_REUSEPORT in addition to SO_REUSEADDR.
318  // This probably needs to be set for any BSD
320  {
321  int reuseport_value = 1;
322  if (setsockopt(my->_accept.native_handle(), SOL_SOCKET, SO_REUSEPORT,
323  (char*)&reuseport_value, sizeof(reuseport_value)) < 0)
324  {
325  if (errno == ENOPROTOOPT)
327  else
328  wlog("Error setting SO_REUSEPORT");
329  }
330  }
331 #endif // __APPLE__
332  }
333  void tcp_server::listen( uint16_t port )
334  {
335  if( !my )
336  my = new impl;
337  try
338  {
339  my->_accept.bind(boost::asio::ip::tcp::endpoint(boost::asio::ip::address_v4(), port));
340  my->_accept.listen(256);
341  }
342  FC_RETHROW_EXCEPTIONS(warn, "error listening on socket");
343  }
345  {
346  if( !my )
347  my = new impl;
348  try
349  {
350  my->_accept.bind(boost::asio::ip::tcp::endpoint(boost::asio::ip::address_v4::from_string((string)ep.get_address()), ep.port()));
351  my->_accept.listen();
352  }
353  FC_RETHROW_EXCEPTIONS(warn, "error listening on socket");
354  }
355 
357  {
358  FC_ASSERT( my != nullptr );
359  return fc::ip::endpoint(my->_accept.local_endpoint().address().to_v4().to_ulong(),
360  my->_accept.local_endpoint().port() );
361  }
362 
363  uint16_t tcp_server::get_port()const
364  {
365  FC_ASSERT( my != nullptr );
366  return my->_accept.local_endpoint().port();
367  }
368 
369 
370 
371 } // namespace fc
uint16_t get_port() const
Definition: tcp_socket.cpp:363
std::string except_str()
Definition: exception.cpp:272
virtual void close()
Definition: tcp_socket.cpp:103
future< size_t > write_some(AsyncWriteStream &s, const ConstBufferSequence &buf)
wraps boost::asio::async_write_some
Definition: asio.hpp:193
void set_io_hooks(tcp_socket_io_hooks *new_hooks)
Definition: tcp_socket.cpp:236
fc::ip::endpoint local_endpoint() const
Definition: tcp_socket.cpp:137
virtual size_t writesome(boost::asio::ip::tcp::socket &socket, const char *buffer, size_t length) override
Definition: tcp_socket.cpp:79
boost::asio::ip::tcp::acceptor _accept
Definition: tcp_socket.cpp:286
#define elog(FORMAT,...)
Definition: logger.hpp:129
microseconds milliseconds(int64_t s)
Definition: time.hpp:35
void bind(const fc::ip::endpoint &local_endpoint)
Definition: tcp_socket.cpp:160
boost::asio::ip::tcp::socket _sock
Definition: tcp_socket.cpp:67
void enable_keep_alives(const fc::microseconds &interval)
Definition: tcp_socket.cpp:174
virtual bool eof() const
Definition: tcp_socket.cpp:112
fc::ip::endpoint get_local_endpoint() const
Definition: tcp_socket.cpp:356
#define FC_THROW(...)
Definition: exception.hpp:366
void connect(AsyncSocket &sock, const EndpointType &ep)
wraps boost::asio::socket::async_connect
Definition: asio.hpp:263
#define FC_RETHROW_EXCEPTIONS(LOG_LEVEL, FORMAT,...)
Catchs all exception&#39;s, std::exceptions, and ... and rethrows them after appending the provided log m...
Definition: exception.hpp:463
fc::future< size_t > _write_in_progress
Definition: tcp_socket.cpp:65
#define wlog(FORMAT,...)
Definition: logger.hpp:123
void accept(AcceptorType &acc, SocketType &sock)
wraps boost::asio::async_accept
Definition: asio.hpp:251
bool have_so_reuseport
Definition: tcp_socket.cpp:25
const address & get_address() const
Definition: ip.cpp:72
void set_reuse_address(bool enable=true)
Definition: tcp_socket.cpp:310
virtual size_t readsome(boost::asio::ip::tcp::socket &socket, char *buffer, size_t length) override
Definition: tcp_socket.cpp:71
microseconds seconds(int64_t s)
Definition: time.hpp:34
uint16_t port() const
Definition: ip.cpp:71
int64_t count() const
Definition: time.hpp:28
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
Definition: exception.hpp:345
void connect_to(const fc::ip::endpoint &remote_endpoint)
Definition: tcp_socket.cpp:156
boost::asio::ip::tcp::endpoint endpoint
Definition: asio.hpp:240
void listen(uint16_t port)
Definition: tcp_socket.cpp:333
bool is_open() const
Definition: tcp_socket.cpp:94
virtual void flush()
Definition: tcp_socket.cpp:102
virtual size_t readsome(char *buffer, size_t max)
Definition: tcp_socket.cpp:147
Defines exception&#39;s used by fc.
future< size_t > read_some(AsyncReadStream &s, const MutableBufferSequence &buf)
Definition: asio.hpp:123
boost::asio::io_service & default_io_service()
Definition: asio.cpp:182
Definition: api.hpp:15
tcp_socket_io_hooks * _io_hooks
Definition: tcp_socket.cpp:68
void accept(tcp_socket &s)
Definition: tcp_socket.cpp:302
fc::future< size_t > _read_in_progress
Definition: tcp_socket.cpp:66
void set_reuse_address(bool enable=true)
Definition: tcp_socket.cpp:241
fc::ip::endpoint remote_endpoint() const
Definition: tcp_socket.cpp:126
virtual size_t writesome(const char *buffer, size_t len)
Definition: tcp_socket.cpp:116