BitShares-Core  6.1.0
BitShares blockchain implementation and command-line interface software
message_oriented_connection.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 Cryptonomex, Inc., and contributors.
3  *
4  * The MIT License
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  */
24 #include <fc/thread/thread.hpp>
25 #include <fc/thread/mutex.hpp>
27 #include <fc/thread/future.hpp>
28 #include <fc/log/logger.hpp>
29 #include <fc/io/enum_type.hpp>
30 
33 #include <graphene/net/config.hpp>
34 
35 #include <atomic>
36 
37 #ifdef DEFAULT_LOGGER
38 # undef DEFAULT_LOGGER
39 #endif
40 #define DEFAULT_LOGGER "p2p"
41 
42 #ifndef NDEBUG
43 # define VERIFY_CORRECT_THREAD() assert(_thread->is_current())
44 #else
45 # define VERIFY_CORRECT_THREAD() do {} while (0)
46 #endif
47 
48 namespace graphene { namespace net {
49  namespace detail
50  {
52  {
53  private:
56  stcp_socket _sock;
57  fc::promise<void>::ptr _ready_for_sending;
58  fc::future<void> _read_loop_done;
59  uint64_t _bytes_received;
60  uint64_t _bytes_sent;
61 
62  fc::time_point _connected_time;
63  fc::time_point _last_message_received_time;
64  fc::time_point _last_message_sent_time;
65 
66  std::atomic_bool _send_message_in_progress;
67  std::atomic_bool _read_loop_in_progress;
68 #ifndef NDEBUG
69  fc::thread* _thread;
70 #endif
71 
72  void read_loop();
73  void start_read_loop();
74  public:
76  void accept();
77  void connect_to(const fc::ip::endpoint& remote_endpoint);
78  void bind(const fc::ip::endpoint& local_endpoint);
79 
81  message_oriented_connection_delegate* delegate = nullptr);
83 
84  void send_message(const message& message_to_send);
85  void close_connection();
86  void destroy_connection();
87 
88  uint64_t get_total_bytes_sent() const;
89  uint64_t get_total_bytes_received() const;
90 
93  fc::time_point get_connection_time() const { return _connected_time; }
95  };
96 
99  : _self(self),
100  _delegate(delegate),
101  _ready_for_sending(fc::promise<void>::create()),
102  _bytes_received(0),
103  _bytes_sent(0),
104  _send_message_in_progress(false),
105  _read_loop_in_progress(false)
106 #ifndef NDEBUG
107  ,_thread(&fc::thread::current())
108 #endif
109  {
110  }
112  {
115  }
116 
118  {
120  return _sock.get_socket();
121  }
122 
124  {
126  _sock.accept();
127  assert(!_read_loop_done.valid()); // check to be sure we never launch two read loops
128  _read_loop_done = fc::async([=](){ read_loop(); }, "message read_loop");
129  _ready_for_sending->set_value();
130  }
131 
133  {
135  _sock.connect_to(remote_endpoint);
136  assert(!_read_loop_done.valid()); // check to be sure we never launch two read loops
137  _read_loop_done = fc::async([=](){ read_loop(); }, "message read_loop");
138  _ready_for_sending->set_value();
139  }
140 
142  {
144  _sock.bind(local_endpoint);
145  }
146 
148  {
149  std::atomic_bool* _flag;
150  public:
151  explicit no_parallel_execution_guard(std::atomic_bool* flag) : _flag(flag)
152  {
153  bool expected = false;
154  FC_ASSERT( flag->compare_exchange_strong( expected, true ), "Only one thread at time can visit it");
155  }
157  {
158  *_flag = false;
159  }
160  };
161 
162  void message_oriented_connection_impl::read_loop()
163  {
165  const int BUFFER_SIZE = 16;
166  const int LEFTOVER = BUFFER_SIZE - sizeof(message_header);
167  static_assert(BUFFER_SIZE >= sizeof(message_header), "insufficient buffer");
168 
169  no_parallel_execution_guard guard( &_read_loop_in_progress );
170 
171  _connected_time = fc::time_point::now();
172 
173  fc::oexception exception_to_rethrow;
174  bool call_on_connection_closed = false;
175 
176  try
177  {
178  message m;
179  char buffer[BUFFER_SIZE];
180  while( true )
181  {
182  _sock.read(buffer, BUFFER_SIZE);
183  _bytes_received += BUFFER_SIZE;
184  memcpy((char*)&m, buffer, sizeof(message_header));
185  FC_ASSERT( m.size.value() <= MAX_MESSAGE_SIZE, "", ("m.size",m.size.value())("MAX_MESSAGE_SIZE",MAX_MESSAGE_SIZE) );
186 
187  size_t remaining_bytes_with_padding = 16 * ((m.size.value() - LEFTOVER + 15) / 16);
188  m.data.resize(LEFTOVER + remaining_bytes_with_padding); //give extra 16 bytes to allow for padding added in send call
189  std::copy(buffer + sizeof(message_header), buffer + sizeof(buffer), m.data.begin());
190  if (remaining_bytes_with_padding)
191  {
192  _sock.read(&m.data[LEFTOVER], remaining_bytes_with_padding);
193  _bytes_received += remaining_bytes_with_padding;
194  }
195  m.data.resize(m.size.value()); // truncate off the padding bytes
196 
197  _last_message_received_time = fc::time_point::now();
198 
199  try
200  {
201  // message handling errors are warnings...
202  _delegate->on_message(_self, m);
203  }
205  catch ( const fc::canceled_exception& e ) { throw; }
206  catch ( const fc::eof_exception& e ) { throw; }
207  catch ( const fc::exception& e)
208  {
210  wlog( "message transmission failed ${er}", ("er", e.to_detail_string() ) );
211  throw;
212  }
213  }
214  }
215  catch ( const fc::canceled_exception& e )
216  {
217  wlog( "caught a canceled_exception in read_loop. this should mean we're in the process of deleting this object already, so there's no need to notify the delegate: ${e}", ("e", e.to_detail_string() ) );
218  throw;
219  }
220  catch ( const fc::eof_exception& e )
221  {
222  wlog( "disconnected ${e}", ("e", e.to_detail_string() ) );
223  call_on_connection_closed = true;
224  }
225  catch ( const fc::exception& e )
226  {
227  elog( "disconnected ${er}", ("er", e.to_detail_string() ) );
228  call_on_connection_closed = true;
229  exception_to_rethrow = fc::unhandled_exception(FC_LOG_MESSAGE(warn, "disconnected: ${e}", ("e", e.to_detail_string())));
230  }
231  catch ( const std::exception& e )
232  {
233  elog( "disconnected ${er}", ("er", e.what() ) );
234  call_on_connection_closed = true;
235  exception_to_rethrow = fc::unhandled_exception(FC_LOG_MESSAGE(warn, "disconnected: ${e}", ("e", e.what())));
236  }
237  catch ( ... )
238  {
239  elog( "unexpected exception" );
240  call_on_connection_closed = true;
241  exception_to_rethrow = fc::unhandled_exception(FC_LOG_MESSAGE(warn, "disconnected: ${e}", ("e", fc::except_str())));
242  }
243 
244  if (call_on_connection_closed)
245  _delegate->on_connection_closed(_self);
246 
247  if (exception_to_rethrow)
248  throw *exception_to_rethrow;
249  }
250 
252  {
254 #if 0 // this gets too verbose
255 #ifndef NDEBUG
256  fc::optional<fc::ip::endpoint> remote_endpoint;
257  if (_sock.get_socket().is_open())
258  remote_endpoint = _sock.get_socket().remote_endpoint();
259  struct scope_logger {
261  scope_logger(const fc::optional<fc::ip::endpoint>& endpoint) : endpoint(endpoint) { dlog("entering message_oriented_connection::send_message() for peer ${endpoint}", ("endpoint", endpoint)); }
262  ~scope_logger() { dlog("leaving message_oriented_connection::send_message() for peer ${endpoint}", ("endpoint", endpoint)); }
263  } send_message_scope_logger(remote_endpoint);
264 #endif
265 #endif
266  no_parallel_execution_guard guard( &_send_message_in_progress );
267  _ready_for_sending->wait();
268 
269  try
270  {
271  size_t size_of_message_and_header = sizeof(message_header) + message_to_send.size.value();
272  if( message_to_send.size.value() > MAX_MESSAGE_SIZE )
273  elog("Trying to send a message larger than MAX_MESSAGE_SIZE. This probably won't work...");
274  //pad the message we send to a multiple of 16 bytes
275  size_t size_with_padding = 16 * ((size_of_message_and_header + 15) / 16);
276  std::vector<char> padded_message( size_with_padding );
277 
278  memcpy( padded_message.data(), (const char*)&message_to_send, sizeof(message_header) );
279  memcpy( padded_message.data() + sizeof(message_header), message_to_send.data.data(),
280  message_to_send.size.value() );
281  char* padding_space = padded_message.data() + sizeof(message_header) + message_to_send.size.value();
282  memset(padding_space, 0, size_with_padding - size_of_message_and_header);
283  _sock.write( padded_message.data(), size_with_padding );
284  _sock.flush();
285  _bytes_sent += size_with_padding;
286  _last_message_sent_time = fc::time_point::now();
287  } FC_RETHROW_EXCEPTIONS( warn, "unable to send message" )
288  }
289 
291  {
293  _sock.close();
294  }
295 
297  {
299 
300  fc::optional<fc::ip::endpoint> remote_endpoint;
301  if (_sock.get_socket().is_open())
302  remote_endpoint = _sock.get_socket().remote_endpoint();
303  ilog( "in destroy_connection() for ${endpoint}", ("endpoint", remote_endpoint) );
304 
305  if (_send_message_in_progress)
306  elog("Error: message_oriented_connection is being destroyed while a send_message is in progress. "
307  "The task calling send_message() should have been canceled already");
308  assert(!_send_message_in_progress);
309 
310  try
311  {
312  _read_loop_done.cancel_and_wait(__FUNCTION__);
313  }
314  catch ( const fc::exception& e )
315  {
316  wlog( "Exception thrown while canceling message_oriented_connection's read_loop, ignoring: ${e}", ("e",e) );
317  }
318  catch (...)
319  {
320  wlog( "Exception thrown while canceling message_oriented_connection's read_loop, ignoring" );
321  }
322  _ready_for_sending->set_exception( std::make_shared<fc::canceled_exception>() );
323  }
324 
326  {
328  return _bytes_sent;
329  }
330 
332  {
334  return _bytes_received;
335  }
336 
338  {
340  return _last_message_sent_time;
341  }
342 
344  {
346  return _last_message_received_time;
347  }
348 
350  {
352  return _sock.get_shared_secret();
353  }
354 
355  } // end namespace graphene::net::detail
356 
357 
359  my( std::make_unique<detail::message_oriented_connection_impl>(this, delegate) )
360  {
361  }
362 
364  {
365  }
366 
368  {
369  return my->get_socket();
370  }
371 
373  {
374  my->accept();
375  }
376 
378  {
379  my->connect_to(remote_endpoint);
380  }
381 
383  {
384  my->bind(local_endpoint);
385  }
386 
388  {
389  my->send_message(message_to_send);
390  }
391 
393  {
394  my->close_connection();
395  }
396 
398  {
399  my->destroy_connection();
400  }
401 
403  {
404  return my->get_total_bytes_sent();
405  }
406 
408  {
409  return my->get_total_bytes_received();
410  }
411 
413  {
414  return my->get_last_message_sent_time();
415  }
416 
418  {
419  return my->get_last_message_received_time();
420  }
422  {
423  return my->get_connection_time();
424  }
426  {
427  return my->get_shared_secret();
428  }
429 
430 } } // end namespace graphene::net
std::string except_str()
Definition: exception.cpp:272
message_oriented_connection_impl(message_oriented_connection *self, message_oriented_connection_delegate *delegate=nullptr)
auto async(Functor &&f, const char *desc FC_TASK_NAME_DEFAULT_ARG, priority prio=priority()) -> fc::future< decltype(f())>
Definition: thread.hpp:227
void connect_to(const fc::ip::endpoint &remote_endpoint)
Definition: stcp_socket.cpp:72
Definition: api.cpp:48
#define elog(FORMAT,...)
Definition: logger.hpp:129
void connect_to(const fc::ip::endpoint &remote_endpoint)
Used to generate a useful error report when an exception is thrown.At each level in the stack where t...
Definition: exception.hpp:56
#define FC_RETHROW_EXCEPTIONS(LOG_LEVEL, FORMAT,...)
Catchs all exception&#39;s, std::exceptions, and ... and rethrows them after appending the provided log m...
Definition: exception.hpp:464
#define wlog(FORMAT,...)
Definition: logger.hpp:123
message_oriented_connection(message_oriented_connection_delegate *delegate=nullptr)
provides stack-based nullable value similar to boost::optional
Definition: optional.hpp:20
fc::tcp_socket & get_socket()
Definition: stcp_socket.hpp:40
re-thrown whenever an unhandled exception is caught.Any exceptions thrown by 3rd party libraries that...
Definition: exception.hpp:146
#define ilog(FORMAT,...)
Definition: logger.hpp:117
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
Definition: exception.hpp:345
boost::asio::ip::tcp::endpoint endpoint
Definition: asio.hpp:239
bool valid() const
Definition: future.hpp:311
std::string to_detail_string(log_level ll=log_level::all) const
Definition: exception.cpp:183
void send_message(const message &message_to_send)
#define VERIFY_CORRECT_THREAD()
#define dlog(FORMAT,...)
Definition: logger.hpp:100
void bind(const fc::ip::endpoint &local_endpoint)
Definition: api.hpp:15
boost::endian::little_uint32_buf_t size
Definition: message.hpp:43
void bind(const fc::ip::endpoint &local_endpoint)
Definition: stcp_socket.cpp:78
void copy(const path &from, const path &to)
Definition: filesystem.cpp:241
static time_point now()
Definition: time.cpp:13
#define FC_LOG_MESSAGE(LOG_LEVEL, FORMAT,...)
A helper method for generating log messages.
void connect_to(const fc::ip::endpoint &remote_endpoint)
std::vector< char > data
Definition: message.hpp:60
#define MAX_MESSAGE_SIZE
Definition: config.hpp:41