BitShares-Core  5.0.0
BitShares blockchain implementation and command-line interface software
thread.cpp
Go to the documentation of this file.
1 #include <fc/thread/thread.hpp>
2 #include <fc/io/sstream.hpp>
3 #include <fc/log/logger.hpp>
4 #include "thread_d.hpp"
5 
6 #include <iostream>
7 
8 #if defined(_MSC_VER) && !defined(NDEBUG)
9 # include <windows.h>
10 const DWORD MS_VC_EXCEPTION=0x406D1388;
11 
12 #pragma pack(push,8)
13 typedef struct tagTHREADNAME_INFO
14 {
15  DWORD dwType; // Must be 0x1000.
16  LPCSTR szName; // Pointer to name (in user addr space).
17  DWORD dwThreadID; // Thread ID (-1=caller thread).
18  DWORD dwFlags; // Reserved for future use, must be zero.
19 } THREADNAME_INFO;
20 #pragma pack(pop)
21 
22 static void set_thread_name(const char* threadName)
23 {
24  THREADNAME_INFO info;
25  info.dwType = 0x1000;
26  info.szName = threadName;
27  info.dwThreadID = -1;
28  info.dwFlags = 0;
29 
30  __try
31  {
32  RaiseException(MS_VC_EXCEPTION, 0, sizeof(info)/sizeof(ULONG_PTR), (ULONG_PTR*)&info);
33  }
34  __except(EXCEPTION_EXECUTE_HANDLER)
35  {
36  }
37 }
38 #elif defined(__linux__)
39 # include <pthread.h>
40 static void set_thread_name(const char* threadName)
41 {
42  pthread_setname_np(pthread_self(), threadName);
43 }
44 #elif defined(__APPLE__) && !defined(NDEBUG)
45 # include <pthread.h>
46 static void set_thread_name(const char* threadName)
47 {
48  pthread_setname_np(threadName);
49 }
50 #else
51 static void set_thread_name(const char* threadName)
52 {
53  // do nothing in release mode
54 }
55 #endif
56 
57 namespace fc {
58  const char* thread_name() {
59  return thread::current().name().c_str();
60  }
61  void* thread_ptr() {
62  return &thread::current();
63  }
64 
66 #ifdef _MSC_VER
67  static __declspec(thread) thread* t = NULL;
68 #else
69  static __thread thread* t = NULL;
70 #endif
71  return t;
72  }
73 
74  thread::thread( const std::string& name, thread_idle_notifier* notifier ) {
75  promise<void>::ptr p = promise<void>::create("thread start");
76  boost::thread* t = new boost::thread( [this,p,name,notifier]() {
77  try {
78  set_thread_name(name.c_str()); // set thread's name for the debugger to display
79  this->my = new thread_d( *this, notifier );
80  cleanup();
81  current_thread() = this;
82  p->set_value();
83  exec();
84  } catch ( fc::exception& e ) {
85  if( !p->ready() )
86  {
87  wlog( "unhandled exception" );
89  }
90  else
91  { // possibly shutdown?
92  std::cerr << "unhandled exception in thread '" << name << "'\n";
93  std::cerr << e.to_detail_string( log_level::warn );
94  }
95  } catch ( ... ) {
96  if( !p->ready() )
97  {
98  wlog( "unhandled exception" );
99  p->set_exception( std::make_shared<unhandled_exception>( FC_LOG_MESSAGE( warn, "unhandled exception: ${diagnostic}", ("diagnostic",boost::current_exception_diagnostic_information()) ) ) );
100  }
101  else
102  { // possibly shutdown?
103  std::cerr << "unhandled exception in thread '" << name << "'\n";
104  std::cerr << boost::current_exception_diagnostic_information() << "\n";
105  }
106  }
107  } );
108  p->wait();
109  my->boost_thread = t;
110  my->name = name;
111  }
112  thread::thread( thread_d* ) {
113  my = new thread_d(*this);
114  }
115 
116  thread::~thread() {
117  if( my && is_running() )
118  {
119  quit();
120  }
121 
122  delete my;
123  }
124 
125  thread& thread::current() {
126  if( !current_thread() )
127  current_thread() = new thread((thread_d*)0);
128  return *current_thread();
129  }
130 
131  void thread::cleanup() {
132  if ( current_thread() ) {
133  delete current_thread();
134  current_thread() = nullptr;
135  }
136  }
137 
138  const string& thread::name()const
139  {
140  return my->name;
141  }
142 
143  void thread::set_name( const std::string& n )
144  {
145  if (!is_current())
146  {
147  async([this,n](){ set_name(n); }, "set_name").wait();
148  return;
149  }
150  my->name = n;
151  set_thread_name(my->name.c_str()); // set thread's name for the debugger to display
152  }
153 
154  const char* thread::current_task_desc() const
155  {
156  if (my->current && my->current->cur_task)
157  return my->current->cur_task->get_desc();
158  return NULL;
159  }
160 
161  void thread::debug( const std::string& d ) { /*my->debug(d);*/ }
162 
163 #if defined(__linux__) || defined(__APPLE__)
164 #include <signal.h>
165 #endif
166 
167  void thread::signal(int sig)
168  {
169 #if defined(__linux__) || defined(__APPLE__)
170  pthread_kill( my->boost_thread->native_handle(), sig );
171 #endif
172  }
173 
174  void thread::quit()
175  {
176  //if quitting from a different thread, start quit task on thread.
177  //If we have and know our attached boost thread, wait for it to finish, then return.
178  if( !is_current() )
179  {
180  auto t = my->boost_thread;
181  async( [this](){quit();}, "thread::quit" );
182  if( t )
183  t->join();
184  return;
185  }
186 
187  my->done = true;
188  // We are quiting from our own thread...
189 
190  // break all promises, thread quit!
191  while( my->blocked )
192  {
193  fc::context* cur = my->blocked;
194  while( cur )
195  {
196  fc::context* n = cur->next;
197  // this will move the context into the ready list.
198  cur->set_exception_on_blocking_promises( std::make_shared<canceled_exception>(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting")) );
199 
200  cur = n;
201  }
202  if( my->blocked )
203  debug( "on quit" );
204  }
205  BOOST_ASSERT( my->blocked == 0 );
206 
207  for (task_base* unstarted_task : my->task_pqueue)
208  unstarted_task->set_exception(std::make_shared<canceled_exception>(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting")));
209  my->task_pqueue.clear();
210 
211  for (task_base* scheduled_task : my->task_sch_queue)
212  scheduled_task->set_exception(std::make_shared<canceled_exception>(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting")));
213  my->task_sch_queue.clear();
214 
215 
216 
217  // move all sleep tasks to ready
218  for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i )
219  my->add_context_to_ready_list( my->sleep_pqueue[i] );
220  my->sleep_pqueue.clear();
221 
222  // move all idle tasks to ready
223  fc::context* cur = my->pt_head;
224  while( cur )
225  {
226  fc::context* n = cur->next;
227  cur->next = 0;
228  my->add_context_to_ready_list( cur );
229  cur = n;
230  }
231 
232  // mark all ready tasks (should be everyone)... as canceled
233  for (fc::context* ready_context : my->ready_heap)
234  ready_context->canceled = true;
235 
236  // now that we have poked all fibers... switch to the next one and
237  // let them all quit.
238  while (!my->ready_heap.empty())
239  {
240  my->start_next_fiber(true);
241  my->check_for_timeouts();
242  }
243  my->clear_free_list();
244  my->cleanup_thread_specific_data();
245  }
246 
247  void thread::exec()
248  {
249  if( !my->current )
250  my->current = new fc::context(&fc::thread::current());
251 
252  try
253  {
254  my->process_tasks();
255  }
256  catch( canceled_exception& e )
257  {
258  dlog( "thread canceled: ${e}", ("e", e.to_detail_string()) );
259  }
260  delete my->current;
261  my->current = 0;
262  }
263 
264  bool thread::is_running()const
265  {
266  return !my->done;
267  }
268 
269  priority thread::current_priority()const
270  {
271  BOOST_ASSERT(my);
272  if( my->current )
273  return my->current->prio;
274  return priority();
275  }
276 
277  void thread::yield(bool reschedule)
278  {
279  my->check_fiber_exceptions();
280  my->start_next_fiber(reschedule);
281  my->check_fiber_exceptions();
282  }
283 
284  void thread::sleep_until( const time_point& tp )
285  {
286  if( tp <= (time_point::now()+fc::microseconds(10000)) )
287  yield(true);
288  my->yield_until( tp, false );
289  }
290 
291  int thread::wait_any_until( std::vector<promise_base::ptr>&& p, const time_point& timeout) {
292  for( size_t i = 0; i < p.size(); ++i )
293  if( p[i]->ready() )
294  return i;
295 
296  if( timeout < time_point::now() )
297  {
298  fc::stringstream ss;
299  for( auto i = p.begin(); i != p.end(); ++i )
300  ss << (*i)->get_desc() << ", ";
301 
302  FC_THROW_EXCEPTION( timeout_exception, "${task}", ("task",ss.str()) );
303  }
304 
305  if( !my->current )
306  my->current = new fc::context(&fc::thread::current());
307 
308  for( uint32_t i = 0; i < p.size(); ++i )
309  my->current->add_blocking_promise(p[i].get(),false);
310 
311  // if not max timeout, added to sleep pqueue
312  if( timeout != time_point::maximum() )
313  {
314  my->current->resume_time = timeout;
315  my->sleep_pqueue.push_back(my->current);
316  std::push_heap( my->sleep_pqueue.begin(),
317  my->sleep_pqueue.end(),
319  }
320 
321  my->add_to_blocked( my->current );
322  my->start_next_fiber();
323 
324  for( auto i = p.begin(); i != p.end(); ++i )
325  my->current->remove_blocking_promise(i->get());
326 
327  my->check_fiber_exceptions();
328 
329  for( uint32_t i = 0; i < p.size(); ++i )
330  if( p[i]->ready() )
331  return i;
332 
333  return -1;
334  }
335 
336  void thread::async_task( task_base* t, const priority& p ) {
337  async_task( t, p, time_point::min() );
338  }
339 
340  void thread::poke() {
341  boost::unique_lock<boost::mutex> lock(my->task_ready_mutex);
342  my->task_ready.notify_one();
343  }
344 
345  void thread::async_task( task_base* t, const priority& p, const time_point& tp ) {
346  assert(my);
347  if ( !is_running() )
348  {
349  FC_THROW_EXCEPTION( canceled_exception, "Thread is not running.");
350  }
351  t->_when = tp;
352  task_base* stale_head = my->task_in_queue.load(boost::memory_order_relaxed);
353  do { t->_next = stale_head;
354  }while( !my->task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) );
355 
356  // Because only one thread can post the 'first task', only that thread will attempt
357  // to aquire the lock and therefore there should be no contention on this lock except
358  // when *this thread is about to block on a wait condition.
359  if( this != &current() && !stale_head ) {
360  boost::unique_lock<boost::mutex> lock(my->task_ready_mutex);
361  my->task_ready.notify_one();
362  }
363  }
364 
365  void yield() {
366  thread::current().yield();
367  }
368  void usleep( const microseconds& u ) {
369  thread::current().sleep_until( time_point::now() + u);
370  }
371  void sleep_until( const time_point& tp ) {
372  thread::current().sleep_until(tp);
373  }
374 
375  void exec()
376  {
377  return thread::current().exec();
378  }
379 
380  int wait_any( std::vector<promise_base::ptr>&& v, const microseconds& timeout_us )
381  {
382  return thread::current().wait_any_until( std::move(v), time_point::now() + timeout_us );
383  }
384 
385  int wait_any_until( std::vector<promise_base::ptr>&& v, const time_point& tp )
386  {
387  return thread::current().wait_any_until( std::move(v), tp );
388  }
389 
390  void thread::wait_until( promise_base::ptr&& p, const time_point& timeout )
391  {
392  if( p->ready() )
393  return;
394 
395  if( timeout < time_point::now() )
396  FC_THROW_EXCEPTION( timeout_exception, "${task}", ("task", p->get_desc()) );
397 
398  if( !my->current )
399  my->current = new fc::context(&fc::thread::current());
400 
401  my->current->add_blocking_promise(p.get(), true);
402 
403  // if not max timeout, added to sleep pqueue
404  if( timeout != time_point::maximum() )
405  {
406  my->current->resume_time = timeout;
407  my->sleep_pqueue.push_back(my->current);
408  std::push_heap( my->sleep_pqueue.begin(),
409  my->sleep_pqueue.end(),
411  }
412 
413  my->add_to_blocked( my->current );
414 
415  my->start_next_fiber();
416 
417  my->current->remove_blocking_promise(p.get());
418 
419  my->check_fiber_exceptions();
420  }
421 
422  void thread::notify( const promise_base::ptr& p )
423  {
424  BOOST_ASSERT(p->ready());
425  if( !is_current() )
426  {
427  this->async( [=](){ notify(p); }, "notify", priority::max() );
428  return;
429  }
430  // TODO: store a list of blocked contexts with the promise
431  // to accelerate the lookup.... unless it introduces contention...
432 
433  // iterate over all blocked contexts
434 
435 
436  fc::context* cur_blocked = my->blocked;
437  fc::context* prev_blocked = 0;
438  while( cur_blocked )
439  {
440  // if the blocked context is waiting on this promise
441  if( cur_blocked->try_unblock( p.get() ) )
442  {
443  // remove it from the blocked list.
444 
445  // remove this context from the sleep queue...
446  for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i )
447  {
448  if( my->sleep_pqueue[i] == cur_blocked )
449  {
450  my->sleep_pqueue[i]->blocking_prom.clear();
451  my->sleep_pqueue[i] = my->sleep_pqueue.back();
452  my->sleep_pqueue.pop_back();
453  std::make_heap( my->sleep_pqueue.begin(),my->sleep_pqueue.end(), sleep_priority_less() );
454  break;
455  }
456  }
457  auto cur = cur_blocked;
458  if( prev_blocked )
459  {
460  prev_blocked->next_blocked = cur_blocked->next_blocked;
461  cur_blocked = prev_blocked->next_blocked;
462  }
463  else
464  {
465  my->blocked = cur_blocked->next_blocked;
466  cur_blocked = my->blocked;
467  }
468  cur->next_blocked = 0;
469  my->add_context_to_ready_list( cur );
470  }
471  else
472  { // goto the next blocked task
473  prev_blocked = cur_blocked;
474  cur_blocked = cur_blocked->next_blocked;
475  }
476  }
477  }
478 
479  bool thread::is_current()const
480  {
481  return this == &current();
482  }
483 
484  void thread::notify_task_has_been_canceled()
485  {
486  async( [this](){ my->notify_task_has_been_canceled(); }, "notify_task_has_been_canceled", priority::max() );
487  }
488 
489  void thread::unblock(fc::context* c)
490  {
491  my->unblock(c);
492  }
493 
494  namespace detail {
495  idle_guard::idle_guard( thread_d* t ) : notifier(t->notifier)
496  {
497  if( notifier )
498  {
499  task_base* work = notifier->idle();
500  if( work )
501  {
502  task_base* stale_head = t->task_in_queue.load(boost::memory_order_relaxed);
503  do {
504  work->_next = stale_head;
505  } while( !t->task_in_queue.compare_exchange_weak( stale_head, work, boost::memory_order_release ) );
506  }
507  }
508  }
510  {
511  if( notifier ) notifier->busy();
512  }
513  }
514 
515 #ifdef _MSC_VER
516  /* support for providing a structured exception handler for async tasks */
517  namespace detail
518  {
519  unhandled_exception_filter_type unhandled_structured_exception_filter = nullptr;
520  }
521  void set_unhandled_structured_exception_filter(unhandled_exception_filter_type new_filter)
522  {
523  detail::unhandled_structured_exception_filter = new_filter;
524  }
525  unhandled_exception_filter_type get_unhandled_structured_exception_filter()
526  {
527  return detail::unhandled_structured_exception_filter;
528  }
529 #endif // _MSC_VER
530 } // end namespace fc
void sleep_until(const time_point &tp)
Definition: thread.cpp:371
fc::context * next
Definition: context.hpp:192
virtual task_base * idle()=0
time_point _when
Definition: task.hpp:62
auto async(Functor &&f, const char *desc FC_TASK_NAME_DEFAULT_ARG, priority prio=priority()) -> fc::future< decltype(f())>
Definition: thread.hpp:227
fc::context * next_blocked
Definition: context.hpp:190
const char * thread_name()
Definition: thread.cpp:58
void set_exception(const fc::exception_ptr &e)
Definition: future.cpp:44
T wait(boost::signals2::signal< void(T)> &sig, const microseconds &timeout_us=microseconds::maximum())
Definition: signals.hpp:38
std::shared_ptr< promise_base > ptr
Definition: future.hpp:63
void yield()
Definition: thread.cpp:365
Used to generate a useful error report when an exception is thrown.At each level in the stack where t...
Definition: exception.hpp:56
std::string to_detail_string(log_level ll=log_level::all) const
Definition: exception.cpp:183
void set_exception_on_blocking_promises(const exception_ptr &e)
Definition: context.hpp:171
#define wlog(FORMAT,...)
Definition: logger.hpp:123
void usleep(const microseconds &u)
Definition: thread.cpp:368
virtual std::shared_ptr< exception > dynamic_copy_exception() const
Definition: exception.cpp:267
bool try_unblock(promise_base *p)
Definition: context.hpp:142
void exec()
Definition: thread.cpp:375
const T & wait(const microseconds &timeout=microseconds::maximum())
Definition: future.hpp:127
std::string str()
Definition: sstream.cpp:33
static thread & current()
Definition: thread.cpp:125
void set_value(const T &v)
Definition: future.hpp:136
boost::atomic< task_base * > task_in_queue
Definition: thread_d.hpp:97
virtual void busy()=0
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
Definition: exception.hpp:378
task_base * _next
Definition: task.hpp:65
int wait_any(const fc::future< T1 > &f1, const fc::future< T2 > &f2, const microseconds timeout_us=microseconds::maximum())
Definition: thread.hpp:220
void * thread_ptr()
Definition: thread.cpp:61
bool ready() const
Definition: future.cpp:37
#define dlog(FORMAT,...)
Definition: logger.hpp:100
std::vector< blocked_promise > blocking_prom
Definition: context.hpp:187
int wait_any_until(std::vector< promise_base::ptr > &&v, const time_point &tp)
Definition: thread.cpp:385
const string & name() const
returns the name given by set_name() for this thread
Definition: thread.cpp:138
Definition: api.hpp:15
thread *& current_thread()
Definition: thread.cpp:65
boost::signals2::signal< T > signal
Definition: signals.hpp:20
#define FC_LOG_MESSAGE(LOG_LEVEL, FORMAT,...)
A helper method for generating log messages.
std::shared_ptr< promise< T > > ptr
Definition: future.hpp:111
cerr_t & cerr
Definition: iostream.cpp:176
bool canceled
Definition: context.hpp:194