BitShares-Core  5.0.0
BitShares blockchain implementation and command-line interface software
message_oriented_connection.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 Cryptonomex, Inc., and contributors.
3  *
4  * The MIT License
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  */
24 #include <fc/thread/thread.hpp>
25 #include <fc/thread/mutex.hpp>
27 #include <fc/thread/future.hpp>
28 #include <fc/log/logger.hpp>
29 #include <fc/io/enum_type.hpp>
30 
33 #include <graphene/net/config.hpp>
34 
35 #include <atomic>
36 
37 #ifdef DEFAULT_LOGGER
38 # undef DEFAULT_LOGGER
39 #endif
40 #define DEFAULT_LOGGER "p2p"
41 
42 #ifndef NDEBUG
43 # define VERIFY_CORRECT_THREAD() assert(_thread->is_current())
44 #else
45 # define VERIFY_CORRECT_THREAD() do {} while (0)
46 #endif
47 
48 namespace graphene { namespace net {
49  namespace detail
50  {
52  {
53  private:
56  stcp_socket _sock;
57  fc::promise<void>::ptr _ready_for_sending;
58  fc::future<void> _read_loop_done;
59  uint64_t _bytes_received;
60  uint64_t _bytes_sent;
61 
62  fc::time_point _connected_time;
63  fc::time_point _last_message_received_time;
64  fc::time_point _last_message_sent_time;
65 
66  std::atomic_bool _send_message_in_progress;
67  std::atomic_bool _read_loop_in_progress;
68 #ifndef NDEBUG
69  fc::thread* _thread;
70 #endif
71 
72  void read_loop();
73  void start_read_loop();
74  public:
76  void accept();
77  void connect_to(const fc::ip::endpoint& remote_endpoint);
78  void bind(const fc::ip::endpoint& local_endpoint);
79 
81  message_oriented_connection_delegate* delegate = nullptr);
83 
84  void send_message(const message& message_to_send);
85  void close_connection();
86  void destroy_connection();
87 
88  uint64_t get_total_bytes_sent() const;
89  uint64_t get_total_bytes_received() const;
90 
93  fc::time_point get_connection_time() const { return _connected_time; }
95  };
96 
99  : _self(self),
100  _delegate(delegate),
101  _ready_for_sending(fc::promise<void>::create()),
102  _bytes_received(0),
103  _bytes_sent(0),
104  _send_message_in_progress(false),
105  _read_loop_in_progress(false)
106 #ifndef NDEBUG
107  ,_thread(&fc::thread::current())
108 #endif
109  {
110  }
112  {
115  }
116 
118  {
120  return _sock.get_socket();
121  }
122 
124  {
126  _sock.accept();
127  assert(!_read_loop_done.valid()); // check to be sure we never launch two read loops
128  _read_loop_done = fc::async([=](){ read_loop(); }, "message read_loop");
129  _ready_for_sending->set_value();
130  }
131 
133  {
135  _sock.connect_to(remote_endpoint);
136  assert(!_read_loop_done.valid()); // check to be sure we never launch two read loops
137  _read_loop_done = fc::async([=](){ read_loop(); }, "message read_loop");
138  _ready_for_sending->set_value();
139  }
140 
142  {
144  _sock.bind(local_endpoint);
145  }
146 
148  {
149  std::atomic_bool* _flag;
150  public:
151  explicit no_parallel_execution_guard(std::atomic_bool* flag) : _flag(flag)
152  {
153  bool expected = false;
154  FC_ASSERT( flag->compare_exchange_strong( expected, true ), "Only one thread at time can visit it");
155  }
157  {
158  *_flag = false;
159  }
160  };
161 
162  void message_oriented_connection_impl::read_loop()
163  {
165  const int BUFFER_SIZE = 16;
166  const int LEFTOVER = BUFFER_SIZE - sizeof(message_header);
167  static_assert(BUFFER_SIZE >= sizeof(message_header), "insufficient buffer");
168 
169  no_parallel_execution_guard guard( &_read_loop_in_progress );
170 
171  _connected_time = fc::time_point::now();
172 
173  fc::oexception exception_to_rethrow;
174  bool call_on_connection_closed = false;
175 
176  try
177  {
178  message m;
179  char buffer[BUFFER_SIZE];
180  while( true )
181  {
182  _sock.read(buffer, BUFFER_SIZE);
183  _bytes_received += BUFFER_SIZE;
184  memcpy((char*)&m, buffer, sizeof(message_header));
185  FC_ASSERT( m.size.value() <= MAX_MESSAGE_SIZE, "", ("m.size",m.size.value())("MAX_MESSAGE_SIZE",MAX_MESSAGE_SIZE) );
186 
187  size_t remaining_bytes_with_padding = 16 * ((m.size.value() - LEFTOVER + 15) / 16);
188  m.data.resize(LEFTOVER + remaining_bytes_with_padding); //give extra 16 bytes to allow for padding added in send call
189  std::copy(buffer + sizeof(message_header), buffer + sizeof(buffer), m.data.begin());
190  if (remaining_bytes_with_padding)
191  {
192  _sock.read(&m.data[LEFTOVER], remaining_bytes_with_padding);
193  _bytes_received += remaining_bytes_with_padding;
194  }
195  m.data.resize(m.size.value()); // truncate off the padding bytes
196 
197  _last_message_received_time = fc::time_point::now();
198 
199  try
200  {
201  // message handling errors are warnings...
202  _delegate->on_message(_self, m);
203  }
205  catch ( const fc::canceled_exception& e ) { throw; }
206  catch ( const fc::eof_exception& e ) { throw; }
207  catch ( const fc::exception& e)
208  {
210  wlog( "message transmission failed ${er}", ("er", e.to_detail_string() ) );
211  throw;
212  }
213  }
214  }
215  catch ( const fc::canceled_exception& e )
216  {
217  wlog( "caught a canceled_exception in read_loop. this should mean we're in the process of deleting this object already, so there's no need to notify the delegate: ${e}", ("e", e.to_detail_string() ) );
218  throw;
219  }
220  catch ( const fc::eof_exception& e )
221  {
222  wlog( "disconnected ${e}", ("e", e.to_detail_string() ) );
223  call_on_connection_closed = true;
224  }
225  catch ( const fc::exception& e )
226  {
227  elog( "disconnected ${er}", ("er", e.to_detail_string() ) );
228  call_on_connection_closed = true;
229  exception_to_rethrow = fc::unhandled_exception(FC_LOG_MESSAGE(warn, "disconnected: ${e}", ("e", e.to_detail_string())));
230  }
231  catch ( const std::exception& e )
232  {
233  elog( "disconnected ${er}", ("er", e.what() ) );
234  call_on_connection_closed = true;
235  exception_to_rethrow = fc::unhandled_exception(FC_LOG_MESSAGE(warn, "disconnected: ${e}", ("e", e.what())));
236  }
237  catch ( ... )
238  {
239  elog( "unexpected exception" );
240  call_on_connection_closed = true;
241  exception_to_rethrow = fc::unhandled_exception(FC_LOG_MESSAGE(warn, "disconnected: ${e}", ("e", fc::except_str())));
242  }
243 
244  if (call_on_connection_closed)
245  _delegate->on_connection_closed(_self);
246 
247  if (exception_to_rethrow)
248  throw *exception_to_rethrow;
249  }
250 
252  {
254 #if 0 // this gets too verbose
255 #ifndef NDEBUG
256  fc::optional<fc::ip::endpoint> remote_endpoint;
257  if (_sock.get_socket().is_open())
258  remote_endpoint = _sock.get_socket().remote_endpoint();
259  struct scope_logger {
261  scope_logger(const fc::optional<fc::ip::endpoint>& endpoint) : endpoint(endpoint) { dlog("entering message_oriented_connection::send_message() for peer ${endpoint}", ("endpoint", endpoint)); }
262  ~scope_logger() { dlog("leaving message_oriented_connection::send_message() for peer ${endpoint}", ("endpoint", endpoint)); }
263  } send_message_scope_logger(remote_endpoint);
264 #endif
265 #endif
266  no_parallel_execution_guard guard( &_send_message_in_progress );
267  _ready_for_sending->wait();
268 
269  try
270  {
271  size_t size_of_message_and_header = sizeof(message_header) + message_to_send.size.value();
272  if( message_to_send.size.value() > MAX_MESSAGE_SIZE )
273  elog("Trying to send a message larger than MAX_MESSAGE_SIZE. This probably won't work...");
274  //pad the message we send to a multiple of 16 bytes
275  size_t size_with_padding = 16 * ((size_of_message_and_header + 15) / 16);
276  std::unique_ptr<char[]> padded_message(new char[size_with_padding]);
277 
278  memcpy(padded_message.get(), (char*)&message_to_send, sizeof(message_header));
279  memcpy(padded_message.get() + sizeof(message_header), message_to_send.data.data(), message_to_send.size.value() );
280  char* padding_space = padded_message.get() + sizeof(message_header) + message_to_send.size.value();
281  memset(padding_space, 0, size_with_padding - size_of_message_and_header);
282  _sock.write(padded_message.get(), size_with_padding);
283  _sock.flush();
284  _bytes_sent += size_with_padding;
285  _last_message_sent_time = fc::time_point::now();
286  } FC_RETHROW_EXCEPTIONS( warn, "unable to send message" );
287  }
288 
290  {
292  _sock.close();
293  }
294 
296  {
298 
299  fc::optional<fc::ip::endpoint> remote_endpoint;
300  if (_sock.get_socket().is_open())
301  remote_endpoint = _sock.get_socket().remote_endpoint();
302  ilog( "in destroy_connection() for ${endpoint}", ("endpoint", remote_endpoint) );
303 
304  if (_send_message_in_progress)
305  elog("Error: message_oriented_connection is being destroyed while a send_message is in progress. "
306  "The task calling send_message() should have been canceled already");
307  assert(!_send_message_in_progress);
308 
309  try
310  {
311  _read_loop_done.cancel_and_wait(__FUNCTION__);
312  }
313  catch ( const fc::exception& e )
314  {
315  wlog( "Exception thrown while canceling message_oriented_connection's read_loop, ignoring: ${e}", ("e",e) );
316  }
317  catch (...)
318  {
319  wlog( "Exception thrown while canceling message_oriented_connection's read_loop, ignoring" );
320  }
321  _ready_for_sending->set_exception( std::make_shared<fc::canceled_exception>() );
322  }
323 
325  {
327  return _bytes_sent;
328  }
329 
331  {
333  return _bytes_received;
334  }
335 
337  {
339  return _last_message_sent_time;
340  }
341 
343  {
345  return _last_message_received_time;
346  }
347 
349  {
351  return _sock.get_shared_secret();
352  }
353 
354  } // end namespace graphene::net::detail
355 
356 
358  my(new detail::message_oriented_connection_impl(this, delegate))
359  {
360  }
361 
363  {
364  }
365 
367  {
368  return my->get_socket();
369  }
370 
372  {
373  my->accept();
374  }
375 
377  {
378  my->connect_to(remote_endpoint);
379  }
380 
382  {
383  my->bind(local_endpoint);
384  }
385 
387  {
388  my->send_message(message_to_send);
389  }
390 
392  {
393  my->close_connection();
394  }
395 
397  {
398  my->destroy_connection();
399  }
400 
402  {
403  return my->get_total_bytes_sent();
404  }
405 
407  {
408  return my->get_total_bytes_received();
409  }
410 
412  {
413  return my->get_last_message_sent_time();
414  }
415 
417  {
418  return my->get_last_message_received_time();
419  }
421  {
422  return my->get_connection_time();
423  }
425  {
426  return my->get_shared_secret();
427  }
428 
429 } } // end namespace graphene::net
std::string except_str()
Definition: exception.cpp:272
message_oriented_connection_impl(message_oriented_connection *self, message_oriented_connection_delegate *delegate=nullptr)
bool valid() const
Definition: future.hpp:311
auto async(Functor &&f, const char *desc FC_TASK_NAME_DEFAULT_ARG, priority prio=priority()) -> fc::future< decltype(f())>
Definition: thread.hpp:227
void connect_to(const fc::ip::endpoint &remote_endpoint)
Definition: stcp_socket.cpp:72
Definition: api.cpp:56
#define elog(FORMAT,...)
Definition: logger.hpp:129
void connect_to(const fc::ip::endpoint &remote_endpoint)
Used to generate a useful error report when an exception is thrown.At each level in the stack where t...
Definition: exception.hpp:56
std::string to_detail_string(log_level ll=log_level::all) const
Definition: exception.cpp:183
#define FC_RETHROW_EXCEPTIONS(LOG_LEVEL, FORMAT,...)
Catchs all exception&#39;s, std::exceptions, and ... and rethrows them after appending the provided log m...
Definition: exception.hpp:463
#define wlog(FORMAT,...)
Definition: logger.hpp:123
message_oriented_connection(message_oriented_connection_delegate *delegate=nullptr)
provides stack-based nullable value similar to boost::optional
Definition: optional.hpp:20
fc::tcp_socket & get_socket()
Definition: stcp_socket.hpp:40
re-thrown whenever an unhandled exception is caught.Any exceptions thrown by 3rd party libraries that...
Definition: exception.hpp:146
#define ilog(FORMAT,...)
Definition: logger.hpp:117
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
Definition: exception.hpp:345
boost::asio::ip::tcp::endpoint endpoint
Definition: asio.hpp:240
void send_message(const message &message_to_send)
#define VERIFY_CORRECT_THREAD()
#define dlog(FORMAT,...)
Definition: logger.hpp:100
void bind(const fc::ip::endpoint &local_endpoint)
Definition: api.hpp:15
boost::endian::little_uint32_buf_t size
Definition: message.hpp:45
void bind(const fc::ip::endpoint &local_endpoint)
Definition: stcp_socket.cpp:78
void copy(const path &from, const path &to)
Definition: filesystem.cpp:241
static time_point now()
Definition: time.cpp:13
#define FC_LOG_MESSAGE(LOG_LEVEL, FORMAT,...)
A helper method for generating log messages.
void connect_to(const fc::ip::endpoint &remote_endpoint)
std::vector< char > data
Definition: message.hpp:62
#define MAX_MESSAGE_SIZE
Definition: config.hpp:39