38 # undef DEFAULT_LOGGER 40 #define DEFAULT_LOGGER "p2p" 43 # define VERIFY_CORRECT_THREAD() assert(_thread->is_current()) 45 # define VERIFY_CORRECT_THREAD() do {} while (0) 59 uint64_t _bytes_received;
66 std::atomic_bool _send_message_in_progress;
67 std::atomic_bool _read_loop_in_progress;
73 void start_read_loop();
101 _ready_for_sending(
fc::promise<void>::create()),
104 _send_message_in_progress(false),
105 _read_loop_in_progress(false)
107 ,_thread(&
fc::thread::current())
127 assert(!_read_loop_done.
valid());
128 _read_loop_done =
fc::async([=](){ read_loop(); },
"message read_loop");
136 assert(!_read_loop_done.
valid());
137 _read_loop_done =
fc::async([=](){ read_loop(); },
"message read_loop");
144 _sock.
bind(local_endpoint);
149 std::atomic_bool* _flag;
153 bool expected =
false;
154 FC_ASSERT( flag->compare_exchange_strong( expected,
true ),
"Only one thread at time can visit it");
162 void message_oriented_connection_impl::read_loop()
165 const int BUFFER_SIZE = 16;
167 static_assert(BUFFER_SIZE >=
sizeof(
message_header),
"insufficient buffer");
174 bool call_on_connection_closed =
false;
179 char buffer[BUFFER_SIZE];
182 _sock.read(buffer, BUFFER_SIZE);
183 _bytes_received += BUFFER_SIZE;
187 size_t remaining_bytes_with_padding = 16 * ((m.size.value() - LEFTOVER + 15) / 16);
188 m.data.resize(LEFTOVER + remaining_bytes_with_padding);
190 if (remaining_bytes_with_padding)
192 _sock.read(&m.data[LEFTOVER], remaining_bytes_with_padding);
193 _bytes_received += remaining_bytes_with_padding;
195 m.data.resize(m.size.value());
202 _delegate->on_message(_self, m);
205 catch (
const fc::canceled_exception& e ) {
throw; }
206 catch (
const fc::eof_exception& e ) {
throw; }
215 catch (
const fc::canceled_exception& e )
217 wlog(
"caught a canceled_exception in read_loop. this should mean we're in the process of deleting this object already, so there's no need to notify the delegate: ${e}", (
"e", e.to_detail_string() ) );
220 catch (
const fc::eof_exception& e )
222 wlog(
"disconnected ${e}", (
"e", e.to_detail_string() ) );
223 call_on_connection_closed =
true;
228 call_on_connection_closed =
true;
231 catch (
const std::exception& e )
233 elog(
"disconnected ${er}", (
"er", e.what() ) );
234 call_on_connection_closed =
true;
239 elog(
"unexpected exception" );
240 call_on_connection_closed =
true;
244 if (call_on_connection_closed)
245 _delegate->on_connection_closed(_self);
247 if (exception_to_rethrow)
248 throw *exception_to_rethrow;
254 #if 0 // this gets too verbose 257 if (_sock.get_socket().is_open())
258 remote_endpoint = _sock.get_socket().remote_endpoint();
259 struct scope_logger {
262 ~scope_logger() {
dlog(
"leaving message_oriented_connection::send_message() for peer ${endpoint}", (
"endpoint", endpoint)); }
263 } send_message_scope_logger(remote_endpoint);
267 _ready_for_sending->wait();
271 size_t size_of_message_and_header =
sizeof(
message_header) + message_to_send.
size.value();
273 elog(
"Trying to send a message larger than MAX_MESSAGE_SIZE. This probably won't work...");
275 size_t size_with_padding = 16 * ((size_of_message_and_header + 15) / 16);
276 std::vector<char> padded_message( size_with_padding );
278 memcpy( padded_message.data(), (
const char*)&message_to_send,
sizeof(
message_header) );
280 message_to_send.
size.value() );
281 char* padding_space = padded_message.data() +
sizeof(
message_header) + message_to_send.
size.value();
282 memset(padding_space, 0, size_with_padding - size_of_message_and_header);
283 _sock.write( padded_message.data(), size_with_padding );
285 _bytes_sent += size_with_padding;
301 if (_sock.get_socket().is_open())
302 remote_endpoint = _sock.get_socket().remote_endpoint();
303 ilog(
"in destroy_connection() for ${endpoint}", (
"endpoint", remote_endpoint) );
305 if (_send_message_in_progress)
306 elog(
"Error: message_oriented_connection is being destroyed while a send_message is in progress. " 307 "The task calling send_message() should have been canceled already");
308 assert(!_send_message_in_progress);
312 _read_loop_done.cancel_and_wait(__FUNCTION__);
316 wlog(
"Exception thrown while canceling message_oriented_connection's read_loop, ignoring: ${e}", (
"e",e) );
320 wlog(
"Exception thrown while canceling message_oriented_connection's read_loop, ignoring" );
322 _ready_for_sending->set_exception( std::make_shared<fc::canceled_exception>() );
334 return _bytes_received;
340 return _last_message_sent_time;
346 return _last_message_received_time;
352 return _sock.get_shared_secret();
369 return my->get_socket();
379 my->connect_to(remote_endpoint);
384 my->bind(local_endpoint);
389 my->send_message(message_to_send);
394 my->close_connection();
399 my->destroy_connection();
404 return my->get_total_bytes_sent();
409 return my->get_total_bytes_received();
414 return my->get_last_message_sent_time();
419 return my->get_last_message_received_time();
423 return my->get_connection_time();
427 return my->get_shared_secret();
fc::time_point get_connection_time() const
message_oriented_connection_impl(message_oriented_connection *self, message_oriented_connection_delegate *delegate=nullptr)
auto async(Functor &&f, const char *desc FC_TASK_NAME_DEFAULT_ARG, priority prio=priority()) -> fc::future< decltype(f())>
fc::sha512 get_shared_secret() const
fc::time_point get_last_message_received_time() const
void connect_to(const fc::ip::endpoint &remote_endpoint)
fc::sha512 get_shared_secret() const
void connect_to(const fc::ip::endpoint &remote_endpoint)
Used to generate a useful error report when an exception is thrown.At each level in the stack where t...
void destroy_connection()
#define FC_RETHROW_EXCEPTIONS(LOG_LEVEL, FORMAT,...)
Catchs all exception's, std::exceptions, and ... and rethrows them after appending the provided log m...
void destroy_connection()
message_oriented_connection(message_oriented_connection_delegate *delegate=nullptr)
provides stack-based nullable value similar to boost::optional
fc::tcp_socket & get_socket()
~message_oriented_connection()
uint64_t get_total_bytes_sent() const
uint64_t get_total_bytes_sent() const
fc::tcp_socket & get_socket()
no_parallel_execution_guard(std::atomic_bool *flag)
re-thrown whenever an unhandled exception is caught.Any exceptions thrown by 3rd party libraries that...
uint64_t get_total_bytes_received() const
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
void bind(const fc::ip::endpoint &local_endpoint)
boost::asio::ip::tcp::endpoint endpoint
fc::time_point get_last_message_sent_time() const
uint64_t get_total_bytes_received() const
std::string to_detail_string(log_level ll=log_level::all) const
void send_message(const message &message_to_send)
#define VERIFY_CORRECT_THREAD()
~no_parallel_execution_guard()
fc::tcp_socket & get_socket()
void bind(const fc::ip::endpoint &local_endpoint)
~message_oriented_connection_impl()
fc::time_point get_last_message_received_time() const
void bind(const fc::ip::endpoint &local_endpoint)
void copy(const path &from, const path &to)
#define FC_LOG_MESSAGE(LOG_LEVEL, FORMAT,...)
A helper method for generating log messages.
fc::time_point get_last_message_sent_time() const
void send_message(const message &message_to_send)
void connect_to(const fc::ip::endpoint &remote_endpoint)
fc::time_point get_connection_time() const