BitShares-Core  4.0.0
BitShares blockchain implementation and command-line interface software
thread.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #define FC_CONTEXT_STACK_SIZE (2048*1024)
4 
5 #include <fc/thread/task.hpp>
6 
7 namespace fc {
8  class time_point;
9  class microseconds;
10 
11  namespace detail
12  {
13  class worker_pool;
14  void* get_thread_specific_data(unsigned slot);
15  void set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
17  void* get_task_specific_data(unsigned slot);
18  void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
19  }
20 
25  public:
26  virtual ~thread_idle_notifier() {}
27 
32  virtual task_base* idle() = 0;
36  virtual void busy() = 0;
37  };
38 
39  class thread {
40  public:
41  thread( const std::string& name = "", thread_idle_notifier* notifier = 0 );
42  thread( thread&& m ) = delete;
43  thread& operator=(thread&& t ) = delete;
44 
51  static thread& current();
52  static void cleanup();
53 
54 
58  const string& name()const;
59 
63  void set_name( const string& n );
64 
65  const char* current_task_desc() const;
66 
76  void debug( const std::string& d );
77 
78 
86  template<typename Functor>
87  auto async( Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future<decltype(f())> {
88  typedef decltype(f()) Result;
89  typedef typename std::remove_const_t< std::remove_reference_t<Functor> > FunctorType;
91  task<Result,sizeof(FunctorType)>::create( std::forward<Functor>(f), desc );
92  tsk->retain(); // HERE BE DRAGONS
93  fc::future<Result> r( std::dynamic_pointer_cast< promise<Result> >(tsk) );
94  async_task(tsk.get(),prio);
95  return r;
96  }
97  void poke();
98 
99 
109  template<typename Functor>
110  auto schedule( Functor&& f, const fc::time_point& when,
111  const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future<decltype(f())> {
112  typedef decltype(f()) Result;
113  typename task<Result,sizeof(Functor)>::ptr tsk =
114  task<Result,sizeof(Functor)>::create( std::forward<Functor>(f), desc );
115  tsk->retain(); // HERE BE DRAGONS
116  fc::future<Result> r( std::dynamic_pointer_cast< promise<Result> >(tsk) );
117  async_task(tsk.get(),prio,when);
118  return r;
119  }
120 
133  void quit();
134 
138  void signal(int);
139 
143  bool is_running()const;
144  bool is_current()const;
145 
146  priority current_priority()const;
147  ~thread();
148 
149  template<typename T1, typename T2>
150  int wait_any( const fc::future<T1>& f1, const fc::future<T2>& f2, const microseconds& timeout_us = microseconds::maximum()) {
151  std::vector<fc::promise_base::ptr> proms(2);
152  proms[0] = std::static_pointer_cast<fc::promise_base>(f1.m_prom);
153  proms[1] = std::static_pointer_cast<fc::promise_base>(f2.m_prom);
154  return wait_any_until(std::move(proms), fc::time_point::now()+timeout_us );
155  }
156  private:
157  thread( class thread_d* ); // parameter is ignored, will create a new thread_d
158  friend class promise_base;
159  friend class task_base;
160  friend class thread_d;
161  friend class mutex;
162  friend class detail::worker_pool;
163  friend void* detail::get_thread_specific_data(unsigned slot);
164  friend void detail::set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
166  friend void* detail::get_task_specific_data(unsigned slot);
167  friend void detail::set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
168 #ifndef NDEBUG
170 #endif
171  friend void yield();
172  friend void usleep(const microseconds&);
173  friend void sleep_until(const time_point&);
174  friend void exec();
175  friend int wait_any( std::vector<promise_base::ptr>&& v, const microseconds& );
176  friend int wait_any_until( std::vector<promise_base::ptr>&& v, const time_point& tp );
177  void wait_until( promise_base::ptr && v, const time_point& tp );
178  void notify( const promise_base::ptr& v );
179 
180  void yield(bool reschedule=true);
181  void sleep_until( const time_point& t );
182  void exec();
183  int wait_any_until( std::vector<promise_base::ptr>&& v, const time_point& );
184 
185  void async_task( task_base* t, const priority& p );
186  void async_task( task_base* t, const priority& p, const time_point& tp );
187 
188  void notify_task_has_been_canceled();
189  void unblock(fc::context* c);
190 
191  class thread_d* my;
192  };
193 
197  void yield();
198 
202  void usleep( const microseconds& u );
203 
207  void sleep_until( const time_point& tp );
208 
212  void exec();
213 
219  template<typename T1, typename T2>
220  int wait_any( const fc::future<T1>& f1, const fc::future<T2>& f2, const microseconds timeout_us = microseconds::maximum()) {
221  return fc::thread::current().wait_any(f1,f2,timeout_us);
222  }
223  int wait_any( std::vector<promise_base::ptr>&& v, const microseconds& timeout_us = microseconds::maximum() );
224  int wait_any_until( std::vector<promise_base::ptr>&& v, const time_point& tp );
225 
226  template<typename Functor>
227  auto async( Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future<decltype(f())> {
228  return fc::thread::current().async( std::forward<Functor>(f), desc, prio );
229  }
230  template<typename Functor>
231  auto schedule( Functor&& f, const fc::time_point& t, const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future<decltype(f())> {
232  return fc::thread::current().schedule( std::forward<Functor>(f), t, desc, prio );
233  }
234 
240  template<typename Functor>
241  auto sync_call( thread* t, Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> decltype(f())
242  {
243  if( t == nullptr )
244  return f();
245 
246  typedef decltype(f()) Result;
247  future<Result> r = t->async( f, desc, prio );
248  return r.wait();
249  }
250 
251 } // end namespace fc
252 
253 #ifdef _MSC_VER
254 struct _EXCEPTION_POINTERS;
255 
256 namespace fc {
257  /* There's something about the setup of the stacks created for fc::async tasks
258  * that screws up the global structured exception filters installed by
259  * SetUnhandledExceptionFilter(). The only way I've found to catch an
260  * unhaldned structured exception thrown in an async task is to put a
261  * __try/__except block inside the async task.
262  * We do just that, and if a SEH escapes outside the function running
263  * in the async task, fc will call an exception filter privided by
264  * set_unhandled_structured_exception_filter(), passing as arguments
265  * the result of GetExceptionCode() and GetExceptionInformation().
266  *
267  * Right now there is only one global exception filter, used for any
268  * async task.
269  */
270  typedef int (*unhandled_exception_filter_type)(unsigned, _EXCEPTION_POINTERS*);
271  void set_unhandled_structured_exception_filter(unhandled_exception_filter_type new_filter);
272  unhandled_exception_filter_type get_unhandled_structured_exception_filter();
273 }
274 #endif
275 
void sleep_until(const time_point &tp)
Definition: thread.cpp:371
void set_task_specific_data(unsigned slot, void *new_value, void(*cleanup)(void *))
void retain()
Definition: task.cpp:105
auto async(Functor &&f, const char *desc FC_TASK_NAME_DEFAULT_ARG, priority prio=priority()) -> fc::future< decltype(f())>
Definition: thread.hpp:227
virtual ~thread_idle_notifier()
Definition: thread.hpp:26
a placeholder for the result of an asynchronous operation.
Definition: future.hpp:211
unsigned get_next_unused_task_storage_slot()
void * get_task_specific_data(unsigned slot)
std::shared_ptr< promise_base > ptr
Definition: future.hpp:63
void yield()
Definition: thread.cpp:365
auto async(Functor &&f, const char *desc FC_TASK_NAME_DEFAULT_ARG, priority prio=priority()) -> fc::future< decltype(f())>
Definition: thread.hpp:87
auto schedule(Functor &&f, const fc::time_point &when, const char *desc FC_TASK_NAME_DEFAULT_ARG, priority prio=priority()) -> fc::future< decltype(f())>
Definition: thread.hpp:110
auto sync_call(thread *t, Functor &&f, const char *desc FC_TASK_NAME_DEFAULT_ARG, priority prio=priority()) -> decltype(f())
Definition: thread.hpp:241
void usleep(const microseconds &u)
Definition: thread.cpp:368
mutex
Definition: mutex.hpp:91
std::shared_ptr< task< R, FunctorSize > > ptr
Definition: task.hpp:118
void exec()
Definition: thread.cpp:375
int wait_any(const fc::future< T1 > &f1, const fc::future< T2 > &f2, const microseconds &timeout_us=microseconds::maximum())
Definition: thread.hpp:150
static thread & current()
Definition: thread.cpp:125
void * get_thread_specific_data(unsigned slot)
#define FC_TASK_NAME_DEFAULT_ARG
Definition: future.hpp:14
int wait_any(const fc::future< T1 > &f1, const fc::future< T2 > &f2, const microseconds timeout_us=microseconds::maximum())
Definition: thread.hpp:220
int wait_any_until(std::vector< promise_base::ptr > &&v, const time_point &tp)
Definition: thread.cpp:385
const T & wait(const microseconds &timeout=microseconds::maximum()) const
Definition: future.hpp:228
Definition: api.hpp:15
static microseconds maximum()
Definition: time.hpp:15
static time_point now()
Definition: time.cpp:13
boost::signals2::signal< T > signal
Definition: signals.hpp:20
auto schedule(Functor &&f, const fc::time_point &t, const char *desc FC_TASK_NAME_DEFAULT_ARG, priority prio=priority()) -> fc::future< decltype(f())>
Definition: thread.hpp:231
void set_thread_specific_data(unsigned slot, void *new_value, void(*cleanup)(void *))