BitShares-Core  5.0.0
BitShares blockchain implementation and command-line interface software
future.cpp
Go to the documentation of this file.
1 #include <fc/thread/future.hpp>
3 #include <fc/thread/thread.hpp>
6 
7 #include <boost/assert.hpp>
8 
9 namespace fc {
10 
11  promise_base::promise_base( const char* desc )
12  :_ready(false),
13  _blocked_thread(nullptr),
14  _blocked_fiber_count(0),
15  _timeout(time_point::maximum()),
16  _canceled(false),
17 #ifndef NDEBUG
18  _cancellation_reason(nullptr),
19 #endif
20  _desc(desc),
21  _compl(nullptr)
22  { }
23 
25 
26  const char* promise_base::get_desc()const{
27  return _desc;
28  }
29 
30  void promise_base::cancel(const char* reason /* = nullptr */){
31 // wlog("${desc} canceled!", ("desc", _desc? _desc : ""));
32  _canceled = true;
33 #ifndef NDEBUG
34  _cancellation_reason = reason;
35 #endif
36  }
37  bool promise_base::ready()const {
38  return _ready.load();
39  }
40  bool promise_base::error()const {
41  return std::atomic_load( &_exceptp ) != nullptr;
42  }
43 
45  std::atomic_store( &_exceptp, e );
46  _set_value(nullptr);
47  }
48 
49  void promise_base::_wait( const microseconds& timeout_us ){
50  if( timeout_us == microseconds::maximum() )
52  else
53  _wait_until( time_point::now() + timeout_us );
54  }
55  void promise_base::_wait_until( const time_point& timeout_us ){
56  if( _ready.load() ) {
57  fc::exception_ptr ex = std::atomic_load( &_exceptp );
58  if( ex )
59  ex->dynamic_rethrow_exception();
60  return;
61  }
62  _enqueue_thread();
63  // Need to check _ready again to avoid a race condition.
64  if( _ready.load() )
65  {
66  _dequeue_thread();
67  return _wait_until( timeout_us ); // this will simply return or throw _exceptp
68  }
69 
71  //
72  // Create shared_ptr to take ownership of this; i.e. this will
73  // be deleted when p_this goes out of scope. Consequently,
74  // it would be Very Bad to let p_this go out of scope
75  // before we're done reading/writing instance variables!
76  // See https://github.com/cryptonomex/graphene/issues/597
77  //
78  ptr p_this = shared_from_this();
79  try
80  {
81  //
82  // We clone p_this here because the wait_until() API requires us
83  // to use std::move(). I.e. wait_until() takes ownership of any
84  // pointer passed to it. Since we want to keep ownership ourselves,
85  // we need to have two shared_ptr's to this:
86  //
87  // - p_this to keep this alive until the end of the current function
88  // - p_this2 to be owned by wait_until() as the wait_until() API requires
89  //
90  ptr p_this2 = p_this;
91  thread::current().wait_until( std::move( p_this2 ), timeout_us );
92  }
93  catch (...) { e = std::current_exception(); }
94 
95  _dequeue_thread();
96 
97  if( e ) std::rethrow_exception(e);
98 
99  if( _ready.load() ) return _wait_until( timeout_us ); // this will simply return or throw _exceptp
100 
101  FC_THROW_EXCEPTION( timeout_exception, "" );
102  }
103  void promise_base::_enqueue_thread(){
104  _blocked_fiber_count.fetch_add( 1 );
105  thread* blocked_thread = _blocked_thread.load();
106  // only one thread can wait on a promise at any given time
107  do
108  assert( !blocked_thread || blocked_thread == &thread::current() );
109  while( !_blocked_thread.compare_exchange_weak( blocked_thread, &thread::current() ) );
110  }
111  void promise_base::_dequeue_thread(){
112  if( _blocked_fiber_count.fetch_add( -1 ) == 1 )
113  _blocked_thread.store( nullptr );
114  }
116  // copy _blocked_thread into a local so that if the thread unblocks (e.g.,
117  // because of a timeout) before we get a chance to notify it, we won't be
118  // calling notify on a null pointer
119  thread* blocked_thread = _blocked_thread.load();
120  if( blocked_thread )
121  blocked_thread->notify( shared_from_this() );
122  }
123 
124  void promise_base::_set_value(const void* s){
125  bool ready = false;
126  if( !_ready.compare_exchange_strong( ready, true ) ) //don't allow promise to be set more than once
127  return;
128  _notify();
129  auto* hdl = _compl.load();
130  if( nullptr != hdl )
131  hdl->on_complete( s, std::atomic_load( &_exceptp ) );
132  }
133 
135  auto* hdl = _compl.load();
136  while( !_compl.compare_exchange_weak( hdl, c ) );
137  delete hdl;
138  }
139 }
140 
virtual ~promise_base()
Definition: future.cpp:24
const char * get_desc() const
Definition: future.cpp:26
void set_exception(const fc::exception_ptr &e)
Definition: future.cpp:44
const char * _cancellation_reason
Definition: future.hpp:101
std::shared_ptr< promise_base > ptr
Definition: future.hpp:63
void _wait(const microseconds &timeout_us)
Definition: future.cpp:49
void _set_value(const void *v)
Definition: future.cpp:124
void _wait_until(const time_point &timeout_us)
Definition: future.cpp:55
virtual void cancel(const char *reason FC_CANCELATION_REASON_DEFAULT_ARG)
Definition: future.cpp:30
void _notify()
Definition: future.cpp:115
promise_base(const char *desc FC_TASK_NAME_DEFAULT_ARG)
Definition: future.cpp:11
std::shared_ptr< exception > exception_ptr
Definition: exception.hpp:131
static thread & current()
Definition: thread.cpp:125
bool error() const
Definition: future.cpp:40
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
Definition: exception.hpp:378
Defines exception&#39;s used by fc.
bool ready() const
Definition: future.cpp:37
void _on_complete(detail::completion_handler *c)
Definition: future.cpp:134
Definition: api.hpp:15
static time_point maximum()
Definition: time.hpp:48
static microseconds maximum()
Definition: time.hpp:15
static time_point now()
Definition: time.cpp:13