2 #include <websocketpp/config/asio_client.hpp> 3 #include <websocketpp/config/asio.hpp> 4 #include <websocketpp/server.hpp> 5 #include <websocketpp/config/asio_client.hpp> 6 #include <websocketpp/client.hpp> 7 #include <websocketpp/logger/stub.hpp> 10 #include <websocketpp/extensions/permessage_deflate/enabled.hpp> 12 #include <websocketpp/extensions/permessage_deflate/disabled.hpp> 28 # undef DEFAULT_LOGGER 30 #define DEFAULT_LOGGER "rpc" 32 namespace fc {
namespace http {
37 static void add_windows_root_certs(boost::asio::ssl::context &ctx)
39 HCERTSTORE hStore = CertOpenSystemStore( 0,
"ROOT" );
43 X509_STORE *store = X509_STORE_new();
44 PCCERT_CONTEXT pContext = NULL;
45 while( (pContext = CertEnumCertificatesInStore( hStore, pContext )) != NULL )
47 X509 *x509 = d2i_X509( NULL, (
const unsigned char **)&pContext->pbCertEncoded,
48 pContext->cbCertEncoded);
51 X509_STORE_add_cert( store, x509 );
56 CertFreeCertificateContext( pContext );
57 CertCloseStore( hStore, 0 );
59 SSL_CTX_set_cert_store( ctx.native_handle(), store );
90 typedef websocketpp::transport::asio::endpoint<transport_config>
transport_type;
95 typedef websocketpp::extensions::permessage_deflate::enabled <permessage_deflate_config>
98 typedef websocketpp::extensions::permessage_deflate::disabled <permessage_deflate_config>
135 typedef websocketpp::extensions::permessage_deflate::enabled <permessage_deflate_config>
138 typedef websocketpp::extensions::permessage_deflate::disabled <permessage_deflate_config>
174 typedef websocketpp::extensions::permessage_deflate::enabled <permessage_deflate_config>
177 typedef websocketpp::extensions::permessage_deflate::disabled <permessage_deflate_config>
188 _remote_endpoint = con->get_remote_endpoint();
197 ilog(
"[OUT] ${remote_endpoint} ${msg}",
198 (
"remote_endpoint",_remote_endpoint) (
"msg",message) );
199 auto ec = _ws_connection->send( message );
200 FC_ASSERT( !ec,
"websocket send failed: ${msg}", (
"msg",ec.message() ) );
202 virtual void close( int64_t code,
const std::string& reason )
override 204 _ws_connection->close(code,reason);
209 return _ws_connection->get_request_header(key);
224 if( !forward_header_key.empty() )
226 const std::string value = this->get_request_header( forward_header_key );
228 this->_remote_endpoint = value;
237 typedef websocketpp::lib::shared_ptr<boost::asio::ssl::context>
context_ptr;
239 using websocketpp::connection_hdl;
246 :_server_thread(
fc::
thread::current() ), _forward_header_key( forward_header_key )
248 _server.clear_access_channels( websocketpp::log::alevel::all );
250 _server.set_reuse_addr(
true );
251 _server.set_open_handler( [
this]( connection_hdl hdl ){
252 _server_thread.async( [
this, hdl](){
255 _forward_header_key );
256 _on_connection( _connections[hdl] = new_con );
259 _server.set_message_handler( [
this]( connection_hdl hdl,
260 typename websocketpp::server<T>::message_ptr msg ){
261 _server_thread.async( [
this,hdl,msg](){
262 auto current_con = _connections.find(hdl);
263 if( current_con == _connections.end() )
265 auto payload = msg->get_payload();
266 std::shared_ptr<websocket_connection> con = current_con->second;
267 wlog(
"[IN] ${remote_endpoint} ${msg}",
268 (
"remote_endpoint",con->get_remote_endpoint_string()) (
"msg",payload) );
271 if( _pending_messages )
273 con->on_message( payload );
275 if( _pending_messages > 100 )
285 _server.set_socket_init_handler( []( websocketpp::connection_hdl hdl,
286 typename websocketpp::server<T>::connection_type::socket_type& s ) {
287 boost::asio::ip::tcp::no_delay option(
true);
288 s.lowest_layer().set_option(option);
291 _server.set_http_handler( [
this]( connection_hdl hdl ){
292 _server_thread.async( [
this,hdl](){
293 auto con = _server.get_con_from_hdl(hdl);
296 _on_connection( current_con );
298 con->defer_http_response();
300 std::string remote_endpoint = current_con->get_remote_endpoint_string();
301 std::string request_body = con->get_request_body();
302 wlog(
"[HTTP-IN] ${remote_endpoint} ${msg}",
303 (
"remote_endpoint",remote_endpoint) (
"msg",request_body) );
305 fc::async([current_con, request_body, con, remote_endpoint] {
307 ilog(
"[HTTP-OUT] ${remote_endpoint} ${status} ${msg}",
308 (
"remote_endpoint",remote_endpoint)
309 (
"status",response.
status)
312 con->set_status( websocketpp::http::status_code::value(response.
status) );
313 con->send_http_response();
314 current_con->closed();
319 _server.set_close_handler( [
this]( connection_hdl hdl ){
320 _server_thread.async( [
this,hdl](){
321 if( _connections.find(hdl) != _connections.end() )
323 _connections[hdl]->closed();
324 _connections.erase( hdl );
325 if( _connections.empty() && _all_connections_closed )
326 _all_connections_closed->set_value();
330 wlog(
"unknown connection closed" );
335 _server.set_fail_handler( [
this]( connection_hdl hdl ){
336 _server_thread.async( [
this,hdl](){
337 if( _connections.find(hdl) != _connections.end() )
339 _connections[hdl]->closed();
340 _connections.erase( hdl );
341 if( _connections.empty() && _all_connections_closed )
342 _all_connections_closed->set_value();
347 if( _server_socket_closed )
348 _server_socket_closed->set_value();
350 wlog(
"unknown connection failed" );
358 if( _server.is_listening() )
367 _server.stop_listening();
373 if( !_connections.empty() )
377 auto cpy_con = _connections;
378 websocketpp::lib::error_code ec;
379 for(
auto& item : cpy_con )
380 _server.close( item.first, 0,
"server exit", ec );
382 _all_connections_closed->wait();
385 if( _server_socket_closed )
386 _server_socket_closed->wait();
389 typedef std::map<connection_hdl, websocket_connection_ptr, std::owner_less<connection_hdl> >
con_map;
399 uint32_t _pending_messages = 0;
417 const std::string& forward_header_key )
420 _server.set_tls_init_handler( [server_pem,ssl_password]( websocketpp::connection_hdl hdl ) {
421 context_ptr ctx = websocketpp::lib::make_shared<boost::asio::ssl::context>(
422 boost::asio::ssl::context::tlsv12 );
424 ctx->set_options( boost::asio::ssl::context::default_workarounds |
425 boost::asio::ssl::context::no_sslv2 |
426 boost::asio::ssl::context::no_sslv3 |
427 boost::asio::ssl::context::no_tlsv1 |
428 boost::asio::ssl::context::no_tlsv1_1 |
429 boost::asio::ssl::context::single_dh_use );
430 ctx->set_password_callback(
431 [ssl_password](std::size_t max_length, boost::asio::ssl::context::password_purpose){
434 ctx->use_certificate_chain_file(server_pem);
435 ctx->use_private_key_file(server_pem, boost::asio::ssl::context::pem);
436 }
catch (std::exception& e) {
453 :_client_thread(
fc::
thread::current() )
455 _client.clear_access_channels( websocketpp::log::alevel::all );
456 _client.set_message_handler( [
this]( connection_hdl hdl,
457 typename websocketpp::client<T>::message_ptr msg ){
458 _client_thread.async( [
this,msg](){
459 wdump((msg->get_payload()));
460 auto received = msg->get_payload();
463 _connection->on_message(received);
467 _client.set_close_handler( [
this]( connection_hdl hdl ){
468 _client_thread.async( [
this](){
470 _connection->closed();
475 _closed->set_value();
477 _client.set_fail_handler( [
this]( connection_hdl hdl ){
478 auto con = _client.get_con_from_hdl(hdl);
479 auto message = con->get_ec().message();
482 _client_thread.async( [
this](){
484 _connection->closed();
489 if( _connected && !_connected->ready() )
495 _closed->set_value();
505 _connection->close(0,
"client closed");
514 websocketpp::lib::error_code ec;
519 _client.set_open_handler( [
this]( websocketpp::connection_hdl hdl ){
521 auto con = _client.get_con_from_hdl(hdl);
525 _connected->set_value();
528 auto con = _client.get_connection( uri, ec );
532 con->append_header( h.key, h.val );
535 FC_ASSERT( !ec,
"error: ${e}", (
"e",ec.message()) );
537 _client.connect(con);
583 std::string ca_filename_copy = ca_filename;
585 _client.set_tls_init_handler( [
this,ca_filename_copy](websocketpp::connection_hdl) {
586 context_ptr ctx = websocketpp::lib::make_shared<boost::asio::ssl::context>(
587 boost::asio::ssl::context::tlsv12);
589 ctx->set_options( boost::asio::ssl::context::default_workarounds |
590 boost::asio::ssl::context::no_sslv2 |
591 boost::asio::ssl::context::no_sslv3 |
592 boost::asio::ssl::context::no_tlsv1 |
593 boost::asio::ssl::context::no_tlsv1_1 |
594 boost::asio::ssl::context::single_dh_use );
596 setup_peer_verify( ctx, ca_filename_copy );
597 }
catch (std::exception& e) {
609 return websocketpp::uri( _uri ).get_host();
614 if( ca_filename ==
"_none" )
616 ctx->set_verify_mode( boost::asio::ssl::verify_peer );
617 if( ca_filename ==
"_default" )
620 add_windows_root_certs( *ctx );
622 ctx->set_default_verify_paths();
626 ctx->load_verify_file( ca_filename );
627 ctx->set_verify_depth(10);
628 ctx->set_verify_callback( boost::asio::ssl::rfc2818_verification( get_host() ) );
637 :my( new detail::websocket_server_impl( forward_header_key ) ) {}
642 my->_on_connection = handler;
647 my->_server.listen(port);
657 websocketpp::lib::asio::error_code ec;
658 return my->_server.get_local_endpoint(ec).port();
662 my->_server.start_accept();
667 my->_server.stop_listening();
672 auto cpy_con = my->_connections;
673 websocketpp::lib::error_code ec;
675 my->_server.close(
connection.first, websocketpp::close::status::normal,
"Goodbye", ec );
679 const std::string& forward_header_key )
680 :my( new detail::websocket_tls_server_impl(server_pem, ssl_password, forward_header_key) )
687 my->_on_connection = handler;
692 my->_server.listen(port);
702 websocketpp::lib::asio::error_code ec;
703 return my->_server.get_local_endpoint(ec).port();
707 my->_server.start_accept();
712 my->_server.stop_listening();
717 auto cpy_con = my->_connections;
718 websocketpp::lib::error_code ec;
720 my->_server.close(
connection.first, websocketpp::close::status::normal,
"Goodbye", ec );
725 :my( new detail::websocket_client_impl() ),
726 smy(new detail::websocket_tls_client_impl( ca_filename ))
734 FC_ASSERT( uri.substr(0,4) ==
"wss:" || uri.substr(0,3) ==
"ws:",
"Unsupported protocol" );
737 if( uri.substr(0,4) ==
"wss:" )
738 return smy->connect( uri, _headers );
741 return my->connect( uri, _headers );
753 my->_client.close( *my->_hdl, websocketpp::close::status::normal,
"Goodbye" );
765 _headers.emplace_back( key, value );
base::con_msg_manager_type con_msg_manager_type
std::map< connection_hdl, websocket_connection_ptr, std::owner_less< connection_hdl > > con_map
virtual void send_message(const std::string &message) override
base::request_type request_type
auto async(Functor &&f, const char *desc FC_TASK_NAME_DEFAULT_ARG, priority prio=priority()) -> fc::future< decltype(f())>
base::response_type response_type
type::response_type response_type
base::concurrency_type concurrency_type
static ptr create(const char *desc FC_TASK_NAME_DEFAULT_ARG)
base::message_type message_type
virtual ~generic_websocket_client_impl()
base::response_type response_type
fc::optional< connection_hdl > _hdl
websocketpp::transport::asio::endpoint< transport_config > transport_type
type::alog_type alog_type
virtual ~websocket_server_impl()
type::alog_type alog_type
base::concurrency_type concurrency_type
websocketpp::transport::asio::tls_socket::endpoint socket_type
fc::promise< void >::ptr _server_socket_closed
Promise to wait for the server socket to be closed.
type::elog_type elog_type
T wait(boost::signals2::signal< void(T)> &sig, const microseconds &timeout_us=microseconds::maximum())
void listen(uint16_t port)
uint16_t get_listening_port()
base::concurrency_type concurrency_type
websocketpp::log::stub alog_type
websocketpp::lib::shared_ptr< boost::asio::ssl::context > context_ptr
websocketpp::extensions::permessage_deflate::disabled< permessage_deflate_config > permessage_deflate_type
base::message_type message_type
websocket_server_impl(const std::string &forward_header_key)
fc::promise< void >::ptr _all_connections_closed
Promise to wait for all connections to be closed.
Used to generate a useful error report when an exception is thrown.At each level in the stack where t...
virtual ~generic_websocket_server_impl()
virtual ~possibly_proxied_websocket_connection()
std::shared_ptr< connection > connection_ptr
type::response_type response_type
virtual ~websocket_connection_impl()
void on_connection(const on_connection_handler &handler)
on_connection_handler _on_connection
A handler to be called when a new connection is accepted.
websocketpp::extensions::permessage_deflate::disabled< permessage_deflate_config > permessage_deflate_type
virtual ~websocket_tls_server_impl()
void on_connection(const on_connection_handler &handler)
std::shared_ptr< websocket_connection > websocket_connection_ptr
websocket_connection_ptr connect(const std::string &uri, const fc::http::headers &headers)
void setup_peer_verify(context_ptr &ctx, const std::string &ca_filename)
websocketpp::server< T > _server
The server.
websocket_server(const std::string &forward_header_key)
websocketpp::log::stub elog_type
fc::promise< void >::ptr _closed
std::shared_ptr< exception > exception_ptr
generic_websocket_client_impl()
websocket_connection_ptr secure_connect(const std::string &uri)
std::string get_host() const
base::endpoint_msg_manager_type endpoint_msg_manager_type
websocketpp::transport::asio::endpoint< transport_config > transport_type
type::response_type response_type
type::concurrency_type concurrency_type
websocketpp::client< T > _client
std::function< void(const websocket_connection_ptr &)> on_connection_handler
base::request_type request_type
websocketpp::log::stub alog_type
virtual std::string get_request_header(const std::string &key) override
websocket_connection_ptr connect(const std::string &uri)
con_map _connections
Holds accepted connections.
#define FC_CAPTURE_AND_RETHROW(...)
const address & get_address() const
websocketpp::transport::asio::tls_socket::endpoint socket_type
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
type::request_type request_type
base::request_type request_type
boost::asio::ip::tcp::endpoint endpoint
websocketpp::log::stub alog_type
type::concurrency_type concurrency_type
fc::thread & _server_thread
The thread that runs the server.
base::endpoint_msg_manager_type endpoint_msg_manager_type
type::concurrency_type concurrency_type
base::endpoint_msg_manager_type endpoint_msg_manager_type
type::elog_type elog_type
websocket_tls_server_impl(const string &server_pem, const string &ssl_password, const std::string &forward_header_key)
websocketpp::transport::asio::basic_socket::endpoint socket_type
type::alog_type alog_type
websocket_connection_ptr _connection
type::elog_type elog_type
type::request_type request_type
boost::asio::io_service & default_io_service()
generic_websocket_server_impl(const std::string &forward_header_key)
std::vector< header > headers
base::message_type message_type
uint16_t get_listening_port()
websocket_connection_impl(T con)
websocket_tls_server(const std::string &server_pem, const std::string &ssl_password, const std::string &forward_header_key)
base::con_msg_manager_type con_msg_manager_type
std::string body_as_string
void listen(uint16_t port)
websocket_tls_client_impl(const std::string &ca_filename)
base::con_msg_manager_type con_msg_manager_type
possibly_proxied_websocket_connection(T con, const std::string &forward_header_key)
fc::promise< void >::ptr _connected
std::string _forward_header_key
A header like "X-Forwarded-For" (XFF) with data IP:port.
virtual ~websocket_tls_client_impl()
websocketpp::log::stub elog_type
type::request_type request_type
fc::thread & _client_thread
virtual void close(int64_t code, const std::string &reason) override
websocketpp::transport::asio::endpoint< transport_config > transport_type
#define FC_EXCEPTION(EXCEPTION_TYPE, FORMAT,...)
websocketpp::extensions::permessage_deflate::disabled< permessage_deflate_config > permessage_deflate_type
void append_header(const std::string &key, const std::string &value)
websocket_client(const std::string &ca_filename="_default")
websocketpp::log::stub elog_type
base::response_type response_type