BitShares-Core  4.0.0
BitShares blockchain implementation and command-line interface software
thread_d.hpp
Go to the documentation of this file.
1 #include <fc/thread/thread.hpp>
2 #include <fc/stacktrace.hpp>
3 #include <fc/time.hpp>
4 #include <boost/thread.hpp>
5 #include "context.hpp"
6 #include <boost/thread/condition_variable.hpp>
7 #include <boost/thread.hpp>
8 #include <boost/atomic.hpp>
9 
10 #include <sstream>
11 #include <vector>
12 
13 namespace fc {
15  bool operator()( const context::ptr& a, const context::ptr& b ) {
16  return a->resume_time > b->resume_time;
17  }
18  };
19 
20  namespace detail {
21  class idle_guard {
22  public:
23  explicit idle_guard( thread_d* t );
24  ~idle_guard();
25  private:
26  thread_idle_notifier* notifier;
27  };
28  }
29 
30  class thread_d {
31 
32  public:
33  using context_pair = std::pair<thread_d*, fc::context*>;
34 
36  :self(s), boost_thread(0),
37  task_in_queue(0),
38  next_posted_num(1),
39  done(false),
40  current(0),
41  pt_head(0),
42  blocked(0),
43  next_unused_task_storage_slot(0),
44  notifier(n)
45 #ifndef NDEBUG
46  ,non_preemptable_scope_count(0)
47 #endif
48  {
49  static boost::atomic<int> cnt(0);
50  name = std::string("th_") + char('a'+cnt++);
51 // printf("thread=%p\n",this);
52  }
53 
55  {
56  delete current;
57  current = nullptr;
58  fc::context* temp;
59  for (fc::context* ready_context : ready_heap)
60  {
61  if (ready_context->cur_task)
62  {
63  ready_context->cur_task->release();
64  ready_context->cur_task = nullptr;
65  }
66  delete ready_context;
67  }
68  ready_heap.clear();
69  while (blocked)
70  {
71  temp = blocked->next;
72  delete blocked;
73  blocked = temp;
74  }
75  /*
76  while (pt_head)
77  {
78  temp = pt_head->next;
79  delete pt_head;
80  pt_head = temp;
81  }
82  */
83  //ilog("");
84  if (boost_thread)
85  {
86  boost_thread->detach();
87  delete boost_thread;
88  }
89  }
90 
91  fc::thread& self;
92  boost::thread* boost_thread;
94  boost::condition_variable task_ready;
95  boost::mutex task_ready_mutex;
96 
97  boost::atomic<task_base*> task_in_queue;
98  std::vector<task_base*> task_pqueue; // heap of tasks that have never started, ordered by proirity & scheduling time
99  uint64_t next_posted_num; // each task or context gets assigned a number in the order it is ready to execute, tracked here
100  std::vector<task_base*> task_sch_queue; // heap of tasks that have never started but are scheduled for a time in the future, ordered by the time they should be run
101  std::vector<fc::context*> sleep_pqueue; // heap of running tasks that have sleeped, ordered by the time they should resume
102  std::vector<fc::context*> free_list; // list of unused contexts that are ready for deletion
103 
104  bool done;
105  std::string name;
106  fc::context* current; // the currently-executing task in this thread
107 
108  fc::context* pt_head; // list of contexts that can be reused for new tasks
109 
110  std::vector<fc::context*> ready_heap; // priority heap of contexts that are ready to run
111 
112  fc::context* blocked; // linked list of contexts (using 'next_blocked') blocked on promises via wait()
113 
114  // values for thread specific data objects for this thread
115  std::vector<detail::specific_data_info> thread_specific_data;
116  // values for task_specific data for code executing on a thread that's
117  // not a task launched by async (usually the default task on the main
118  // thread in a process)
119  std::vector<detail::specific_data_info> non_task_specific_data;
121 
123 
124 #ifndef NDEBUG
126 #endif
127 
128 #if 0
129  void debug( const std::string& s ) {
130  return;
131  //boost::unique_lock<boost::mutex> lock(log_mutex());
132 
133  fc::cerr<<"--------------------- "<<s.c_str()<<" - "<<current;
134  if( current && current->cur_task ) fc::cerr<<'('<<current->cur_task->get_desc()<<')';
135  fc::cerr<<" ---------------------------\n";
136  fc::cerr<<" Ready\n";
137  fc::context* c = ready_head;
138  while( c ) {
139  fc::cerr<<" "<<c;
140  if( c->cur_task ) fc::cerr<<'('<<c->cur_task->get_desc()<<')';
141  fc::context* p = c->caller_context;
142  while( p ) {
143  fc::cerr<<" -> "<<p;
144  p = p->caller_context;
145  }
146  fc::cerr<<"\n";
147  c = c->next;
148  }
149  fc::cerr<<" Blocked\n";
150  c = blocked;
151  while( c ) {
152  fc::cerr<<" ctx: "<< c;
153  if( c->cur_task ) fc::cerr<<'('<<c->cur_task->get_desc()<<')';
154  fc::cerr << " blocked on prom: ";
155  for( uint32_t i = 0; i < c->blocking_prom.size(); ++i ) {
156  fc::cerr<<c->blocking_prom[i].prom<<'('<<c->blocking_prom[i].prom->get_desc()<<')';
157  if( i + 1 < c->blocking_prom.size() ) {
158  fc::cerr<<",";
159  }
160  }
161 
162  fc::context* p = c->caller_context;
163  while( p ) {
164  fc::cerr<<" -> "<<p;
165  p = p->caller_context;
166  }
167  fc::cerr<<"\n";
168  c = c->next_blocked;
169  }
170  fc::cerr<<"-------------------------------------------------\n";
171  }
172 #endif
173  // insert at from of blocked linked list
174  inline void add_to_blocked( fc::context* c )
175  {
176  c->next_blocked = blocked;
177  blocked = c;
178  }
179 
181  {
182  c->next = pt_head;
183  pt_head = c;
184  /*
185  fc::context* n = pt_head;
186  int i = 0;
187  while( n ) {
188  ++i;
189  n = n->next;
190  }
191  wlog( "idle context...%2% %1%", c, i );
192  */
193  }
194 
196  {
197  fc::context* highest_priority_context = ready_heap.front();
198  std::pop_heap(ready_heap.begin(), ready_heap.end(), task_priority_less());
199  ready_heap.pop_back();
200  return highest_priority_context;
201  }
202 
203  void add_context_to_ready_list(context* context_to_add, bool at_end = false)
204  {
205 
206  context_to_add->context_posted_num = next_posted_num++;
207  ready_heap.push_back(context_to_add);
208  std::push_heap(ready_heap.begin(), ready_heap.end(), task_priority_less());
209  }
210 
212  {
213  bool operator()(const task_base* a, const task_base* b) const
214  {
215  return a->_prio.value < b->_prio.value ? true :
216  (a->_prio.value > b->_prio.value ? false :
217  a->_posted_num > b->_posted_num);
218  }
219  bool operator()(const task_base* a, const context* b) const
220  {
221  return a->_prio.value < b->prio.value ? true :
222  (a->_prio.value > b->prio.value ? false :
224  }
225  bool operator()(const context* a, const task_base* b) const
226  {
227  return a->prio.value < b->_prio.value ? true :
228  (a->prio.value > b->_prio.value ? false :
230  }
231  bool operator()(const context* a, const context* b) const
232  {
233  return a->prio.value < b->prio.value ? true :
234  (a->prio.value > b->prio.value ? false :
236  }
237  };
238 
240  {
242  {
243  return a->_when > b->_when;
244  }
245  };
246 
247  void enqueue( task_base* t )
248  {
249  time_point now = time_point::now();
250  task_base* cur = t;
251 
252  // the linked list of tasks passed to enqueue is in the reverse order of
253  // what you'd expect -- the first task to be scheduled is at the end of
254  // the list. We'll rectify the ordering by assigning the _posted_num
255  // in reverse order
256  unsigned num_ready_tasks = 0;
257  while (cur)
258  {
259  if (cur->_when <= now)
260  ++num_ready_tasks;
261  cur = cur->_next;
262  }
263 
264  cur = t;
265  next_posted_num += num_ready_tasks;
266  unsigned tasks_posted = 0;
267  while (cur)
268  {
269  if (cur->_when > now)
270  {
271  task_sch_queue.push_back(cur);
272  std::push_heap(task_sch_queue.begin(),
273  task_sch_queue.end(), task_when_less());
274  }
275  else
276  {
277  cur->_posted_num = next_posted_num - (++tasks_posted);
278  task_pqueue.push_back(cur);
279  std::push_heap(task_pqueue.begin(),
280  task_pqueue.end(), task_priority_less());
281  BOOST_ASSERT(this == thread::current().my);
282  }
283  cur = cur->_next;
284  }
285  }
286 
288  {
289  BOOST_ASSERT(this == thread::current().my);
290 
291  // first, if there are any new tasks on 'task_in_queue', which is tasks that
292  // have been just been async or scheduled, but we haven't processed them.
293  // move them into the task_sch_queue or task_pqueue, as appropriate
294 
295  //DLN: changed from memory_order_consume for boost 1.55.
296  //This appears to be safest replacement for now, maybe
297  //can be changed to relaxed later, but needs analysis.
298  task_base* pending_list = task_in_queue.exchange(0, boost::memory_order_seq_cst);
299  if (pending_list)
300  enqueue(pending_list);
301 
302  // second, walk through task_sch_queue and move any scheduled tasks that are now
303  // able to run (because their scheduled time has arrived) to task_pqueue
304 
305  while (!task_sch_queue.empty() &&
306  task_sch_queue.front()->_when <= time_point::now())
307  {
308  task_base* ready_task = task_sch_queue.front();
309  std::pop_heap(task_sch_queue.begin(), task_sch_queue.end(), task_when_less());
310  task_sch_queue.pop_back();
311 
312  ready_task->_posted_num = next_posted_num++;
313  task_pqueue.push_back(ready_task);
314  std::push_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less());
315  }
316  }
317 
319  {
320  // get a new task
321  BOOST_ASSERT( this == thread::current().my );
322 
323  assert(!task_pqueue.empty());
324  task_base* p = task_pqueue.front();
325  std::pop_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less() );
326  task_pqueue.pop_back();
327  return p;
328  }
329 
331  {
332  bool canceled_task = false;
333  for( auto task_itr = task_sch_queue.begin();
334  task_itr != task_sch_queue.end();
335  )
336  {
337  if( (*task_itr)->canceled() )
338  {
339  (*task_itr)->run();
340  (*task_itr)->release(); // HERE BE DRAGONS
341  task_itr = task_sch_queue.erase(task_itr);
342  canceled_task = true;
343  continue;
344  }
345  ++task_itr;
346  }
347 
348  if( canceled_task )
349  std::make_heap( task_sch_queue.begin(), task_sch_queue.end(), task_when_less() );
350 
351  return canceled_task;
352  }
353 
359  {
360  if( current && current->canceled )
361  {
362 #ifdef NDEBUG
363  FC_THROW_EXCEPTION( canceled_exception, "" );
364 #else
365  FC_THROW_EXCEPTION( canceled_exception, "cancellation reason: ${reason}", ("reason", current->cancellation_reason ? current->cancellation_reason : "[none given]"));
366 #endif
367  }
368  else if( done )
369  {
370  ilog( "throwing canceled exception" );
371  FC_THROW_EXCEPTION( canceled_exception, "cancellation reason: thread quitting" );
372  // BOOST_THROW_EXCEPTION( thread_quit() );
373  }
374  }
375 
381  bool start_next_fiber( bool reschedule = false )
382  {
383  /* If this assert fires, it means you are executing an operation that is causing
384  * the current task to yield, but there is a ASSERT_TASK_NOT_PREEMPTED() in effect
385  * (somewhere up the stack) */
386  assert(non_preemptable_scope_count == 0);
387 
388  /* If this assert fires, it means you are causing the current task to yield while
389  * in the middle of handling an exception. The boost::context library's behavior
390  * is not well-defined in this case, and this has the potential to corrupt the
391  * exception stack, often resulting in a crash very soon after this */
392  /* NB: At least on Win64, this only catches a yield while in the body of
393  * a catch block; it fails to catch a yield while unwinding the stack, which
394  * is probably just as likely to cause crashes */
395  if( std::current_exception() != std::exception_ptr() )
396  {
397  std::stringstream stacktrace;
398  print_stacktrace( stacktrace );
399  elog( "Thread ${name} yielded in exception handler!\n${trace}",
400  ("name",thread::current().name())("trace",stacktrace.str()) );
401  assert( std::current_exception() == std::exception_ptr() );
402  }
403 
404  check_for_timeouts();
405  if( !current )
406  current = new fc::context( &fc::thread::current() );
407 
408  priority original_priority = current->prio;
409 
410  // check to see if any other contexts are ready
411  if (!ready_heap.empty())
412  {
413  fc::context* next = ready_pop_front();
414  if (next == current)
415  {
416  // elog( "next == current... something went wrong" );
417  assert(next != current);
418  return false;
419  }
420  BOOST_ASSERT(next != current);
421 
422  // jump to next context, saving current context
423  fc::context* prev = current;
424  current = next;
425  if (reschedule)
426  {
428  add_context_to_ready_list(prev, true);
429  }
430  // slog( "jump to %p from %p", next, prev );
431  // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) );
432 #if BOOST_VERSION >= 106100
433  auto p = context_pair{nullptr, prev};
434  auto t = bc::jump_fcontext( next->my_context, &p );
435  static_cast<context_pair*>(t.data)->second->my_context = t.fctx;
436 #else
437  bc::jump_fcontext( &prev->my_context, next->my_context, 0 );
438 #endif
439  BOOST_ASSERT( current );
440  BOOST_ASSERT( current == prev );
441  //current = prev;
442  }
443  else
444  {
445  // all contexts are blocked, create a new context
446  // that will process posted tasks...
447  fc::context* prev = current;
448 
449  fc::context* next = nullptr;
450  if( pt_head )
451  {
452  // grab cached context
453  next = pt_head;
454  pt_head = pt_head->next;
455  next->next = 0;
456  next->reinitialize();
457  }
458  else
459  {
460  // create new context.
461  next = new fc::context( &thread_d::start_process_tasks, stack_alloc,
462  &fc::thread::current() );
463  }
464 
465  current = next;
466  if( reschedule )
467  {
469  add_context_to_ready_list(prev, true);
470  }
471 
472  // slog( "jump to %p from %p", next, prev );
473  // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) );
474 #if BOOST_VERSION >= 106100
475  auto p = context_pair{this, prev};
476  auto t = bc::jump_fcontext( next->my_context, &p );
477  static_cast<context_pair*>(t.data)->second->my_context = t.fctx;
478 #else
479  bc::jump_fcontext( &prev->my_context, next->my_context, (intptr_t)this );
480 #endif
481  BOOST_ASSERT( current );
482  BOOST_ASSERT( current == prev );
483  //current = prev;
484  }
485 
486  if (reschedule)
487  current->prio = original_priority;
488 
489  if( current->canceled )
490  {
491  //current->canceled = false;
492 #ifdef NDEBUG
493  FC_THROW_EXCEPTION( canceled_exception, "" );
494 #else
495  FC_THROW_EXCEPTION( canceled_exception, "cancellation reason: ${reason}", ("reason", current->cancellation_reason ? current->cancellation_reason : "[none given]"));
496 #endif
497  }
498 
499  return true;
500  }
501 
502 #if BOOST_VERSION >= 106100
503  static void start_process_tasks( bc::transfer_t my )
504  {
505  auto p = static_cast<context_pair*>(my.data);
506  auto self = static_cast<thread_d*>(p->first);
507  p->second->my_context = my.fctx;
508 #else
509  static void start_process_tasks( intptr_t my )
510  {
511  thread_d* self = (thread_d*)my;
512 #endif
513  try
514  {
515  self->process_tasks();
516  }
517  catch ( canceled_exception& ) { /* allowed exception */ }
518  catch ( ... )
519  {
520  elog( "fiber ${name} exited with uncaught exception: ${e}", ("e",fc::except_str())("name", self->name) );
521  // assert( !"fiber exited with uncaught exception" );
522  //TODO replace errror fc::cerr<<"fiber exited with uncaught exception:\n "<<
523  // boost::current_exception_diagnostic_information() <<std::endl;
524  }
525  self->free_list.push_back(self->current);
526  self->start_next_fiber( false );
527  }
528 
530  {
531  task_base* next = dequeue();
532 
533  next->_set_active_context( current );
534  current->cur_task = next;
535  next->run();
536  current->cur_task = nullptr;
537  next->_set_active_context(nullptr);
538  next->release(); // HERE BE DRAGONS
539  current->reinitialize();
540  }
541 
543  {
544  if( task_pqueue.size() ||
545  (task_sch_queue.size() && task_sch_queue.front()->_when <= time_point::now()) ||
546  task_in_queue.load( boost::memory_order_relaxed ) )
547  return true;
548  return false;
549  }
550 
552  {
553  for( uint32_t i = 0; i < free_list.size(); ++i )
554  delete free_list[i];
555  free_list.clear();
556  }
557 
559  {
560  while( !done || blocked )
561  {
562  // move all new tasks to the task_pqueue
563  move_newly_scheduled_tasks_to_task_pqueue();
564 
565  // move all now-ready sleeping tasks to the ready list
566  check_for_timeouts();
567 
568  if (!task_pqueue.empty())
569  {
570  if (!ready_heap.empty())
571  {
572  // a new task and an existing task are both ready to go
573  if (task_priority_less()(task_pqueue.front(), ready_heap.front()))
574  {
575  // run the existing task first
576  pt_push_back(current);
577  start_next_fiber(false);
578  continue;
579  }
580  }
581 
582  // if we made it here, either there's no ready context, or the ready context is
583  // scheduled after the ready task, so we should run the task first
584  run_next_task();
585  continue;
586  }
587 
588  // if I have something else to do other than
589  // process tasks... do it.
590  if (!ready_heap.empty())
591  {
592  pt_push_back( current );
593  start_next_fiber(false);
594  continue;
595  }
596 
597  if( process_canceled_tasks() )
598  continue;
599 
600  clear_free_list();
601 
602  { // lock scope
603  boost::unique_lock<boost::mutex> lock(task_ready_mutex);
604  if( has_next_task() )
605  continue;
606  time_point timeout_time = check_for_timeouts();
607 
608  if( done )
609  return;
610 
611  detail::idle_guard guard( this );
612  if( task_in_queue.load(boost::memory_order_relaxed) )
613  continue;
614 
615  if( timeout_time == time_point::maximum() )
616  task_ready.wait( lock );
617  else if( timeout_time != time_point::min() )
618  {
619  // there may be tasks that have been canceled we should filter them out now
620  // rather than waiting...
621 
622 
623  /* This bit is kind of sloppy -- this wait was originally implemented as a wait
624  * with respect to boost::chrono::system_clock. This behaved rather comically
625  * if you were to do a:
626  * fc::usleep(fc::seconds(60));
627  * and then set your system's clock back a month, it would sleep for a month
628  * plus a minute before waking back up (this happened on Linux, it seems
629  * Windows' behavior in this case was less unexpected).
630  *
631  * Boost Chrono's steady_clock will always increase monotonically so it will
632  * avoid this behavior.
633  *
634  * Right now we don't really have a way to distinguish when a timeout_time is coming
635  * from a function that takes a relative time like fc::usleep() vs something
636  * that takes an absolute time like fc::promise::wait_until(), so we can't always
637  * do the right thing here.
638  */
639  task_ready.wait_until( lock, boost::chrono::steady_clock::now() +
640  boost::chrono::microseconds(timeout_time.time_since_epoch().count() - time_point::now().time_since_epoch().count()) );
641  }
642  }
643  }
644  }
651  {
652  if( !sleep_pqueue.size() && !task_sch_queue.size() )
653  {
654  // ilog( "no timeouts ready" );
655  return time_point::maximum();
656  }
657 
659  if( !sleep_pqueue.empty() && next > sleep_pqueue.front()->resume_time )
660  next = sleep_pqueue.front()->resume_time;
661  if( !task_sch_queue.empty() && next > task_sch_queue.front()->_when )
662  next = task_sch_queue.front()->_when;
663 
664  time_point now = time_point::now();
665  if( now < next )
666  return next;
667 
668  // move all expired sleeping tasks to the ready queue
669  while( sleep_pqueue.size() && sleep_pqueue.front()->resume_time < now )
670  {
671  fc::context::ptr c = sleep_pqueue.front();
672  std::pop_heap(sleep_pqueue.begin(), sleep_pqueue.end(), sleep_priority_less() );
673  // ilog( "sleep pop back..." );
674  sleep_pqueue.pop_back();
675 
676  if( c->blocking_prom.size() )
677  {
678  // ilog( "timeout blocking prom" );
680  }
681  else
682  {
683  // ilog( "..." );
684  // ilog( "ready_push_front" );
685  if (c != current)
686  add_context_to_ready_list(c);
687  }
688  }
689  return time_point::min();
690  }
691 
692  void unblock( fc::context* c )
693  {
694  if( fc::thread::current().my != this )
695  {
696  self.async( [this,c](){ unblock(c); }, "thread_d::unblock" );
697  return;
698  }
699 
700  if (c != current)
701  add_context_to_ready_list(c);
702  }
703 
704  void yield_until( const time_point& tp, bool reschedule ) {
705  check_fiber_exceptions();
706 
707  if( tp <= (time_point::now()+fc::microseconds(10000)) )
708  return;
709 
710  FC_ASSERT(std::current_exception() == std::exception_ptr(),
711  "Attempting to yield while processing an exception");
712 
713  if( !current )
714  current = new fc::context(&fc::thread::current());
715 
716  current->resume_time = tp;
717  current->clear_blocking_promises();
718 
719  sleep_pqueue.push_back(current);
720  std::push_heap( sleep_pqueue.begin(),
721  sleep_pqueue.end(), sleep_priority_less() );
722 
723  start_next_fiber(reschedule);
724 
725  // clear current context from sleep queue...
726  for( uint32_t i = 0; i < sleep_pqueue.size(); ++i )
727  {
728  if( sleep_pqueue[i] == current )
729  {
730  sleep_pqueue[i] = sleep_pqueue.back();
731  sleep_pqueue.pop_back();
732  std::make_heap( sleep_pqueue.begin(),
733  sleep_pqueue.end(), sleep_priority_less() );
734  break;
735  }
736  }
737 
738  current->resume_time = time_point::maximum();
739  check_fiber_exceptions();
740  }
741 
742  void wait( const promise_base::ptr& p, const time_point& timeout ) {
743  if( p->ready() )
744  return;
745 
746  FC_ASSERT(std::current_exception() == std::exception_ptr(),
747  "Attempting to yield while processing an exception");
748 
749  if( timeout < time_point::now() )
750  FC_THROW_EXCEPTION( timeout_exception, "" );
751 
752  if( !current )
753  current = new fc::context(&fc::thread::current());
754 
755  // slog( " %1% blocking on %2%", current, p.get() );
756  current->add_blocking_promise(p.get(),true);
757 
758  // if not max timeout, added to sleep pqueue
759  if( timeout != time_point::maximum() )
760  {
761  current->resume_time = timeout;
762  sleep_pqueue.push_back(current);
763  std::push_heap( sleep_pqueue.begin(),
764  sleep_pqueue.end(),
766  }
767 
768  // elog( "blocking %1%", current );
769  add_to_blocked( current );
770  // debug("swtiching fibers..." );
771 
772 
773  start_next_fiber();
774  // slog( "resuming %1%", current );
775 
776  // slog( " %1% unblocking blocking on %2%", current, p.get() );
777  current->remove_blocking_promise(p.get());
778 
779  check_fiber_exceptions();
780  }
781 
783  {
784  for (auto iter = non_task_specific_data.begin(); iter != non_task_specific_data.end(); ++iter)
785  if (iter->cleanup)
786  iter->cleanup(iter->value);
787 
788  for (auto iter = thread_specific_data.begin(); iter != thread_specific_data.end(); ++iter)
789  if (iter->cleanup)
790  iter->cleanup(iter->value);
791  }
792 
794  {
795  for (fc::context** iter = &blocked; *iter;)
796  {
797  if ((*iter)->canceled)
798  {
799  fc::context* next_blocked = (*iter)->next_blocked;
800  (*iter)->next_blocked = nullptr;
801  add_context_to_ready_list(*iter);
802  *iter = next_blocked;
803  continue;
804  }
805  iter = &(*iter)->next_blocked;
806  }
807 
808  bool task_removed_from_sleep_pqueue = false;
809  for (auto sleep_iter = sleep_pqueue.begin(); sleep_iter != sleep_pqueue.end();)
810  {
811  if ((*sleep_iter)->canceled)
812  {
813  bool already_on_ready_list = std::find(ready_heap.begin(), ready_heap.end(),
814  *sleep_iter) != ready_heap.end();
815  if (!already_on_ready_list)
816  add_context_to_ready_list(*sleep_iter);
817  sleep_iter = sleep_pqueue.erase(sleep_iter);
818  task_removed_from_sleep_pqueue = true;
819  }
820  else
821  ++sleep_iter;
822  }
823  if (task_removed_from_sleep_pqueue)
824  std::make_heap(sleep_pqueue.begin(), sleep_pqueue.end(), sleep_priority_less());
825  }
826  };
827 } // namespace fc
std::string except_str()
Definition: exception.cpp:272
fc::context * next
Definition: context.hpp:192
time_point _when
Definition: task.hpp:62
void wait(const promise_base::ptr &p, const time_point &timeout)
Definition: thread_d.hpp:742
fc::context * pt_head
Definition: thread_d.hpp:108
const char * get_desc() const
Definition: future.cpp:26
fc::context * next_blocked
Definition: context.hpp:190
uint64_t context_posted_num
Definition: context.hpp:200
priority prio
Definition: context.hpp:185
fc::context * current
Definition: thread_d.hpp:106
void release()
Definition: task.cpp:109
void add_blocking_promise(promise_base *p, bool req=true)
Definition: context.hpp:128
void add_to_blocked(fc::context *c)
Definition: thread_d.hpp:174
std::vector< task_base * > task_pqueue
Definition: thread_d.hpp:98
std::shared_ptr< promise_base > ptr
Definition: future.hpp:63
std::vector< fc::context * > free_list
Definition: thread_d.hpp:102
uint64_t _posted_num
Task priority looks like unsupported feature.
Definition: task.hpp:60
void timeout_blocking_promises()
Definition: context.hpp:166
#define elog(FORMAT,...)
Definition: logger.hpp:129
const char * cancellation_reason
Definition: context.hpp:196
task_base * dequeue()
Definition: thread_d.hpp:318
time_point resume_time
Definition: context.hpp:188
void process_tasks()
Definition: thread_d.hpp:558
void print_stacktrace(std::ostream &out)
Definition: stacktrace.cpp:45
unsigned next_unused_task_storage_slot
Definition: thread_d.hpp:120
void clear_free_list()
Definition: thread_d.hpp:551
std::vector< detail::specific_data_info > thread_specific_data
Definition: thread_d.hpp:115
bool process_canceled_tasks()
Definition: thread_d.hpp:330
void enqueue(task_base *t)
Definition: thread_d.hpp:247
void reinitialize()
Definition: context.hpp:99
std::vector< fc::context * > sleep_pqueue
Definition: thread_d.hpp:101
void notify_task_has_been_canceled()
Definition: thread_d.hpp:793
bool start_next_fiber(bool reschedule=false)
Definition: thread_d.hpp:381
std::shared_ptr< exception > exception_ptr
Definition: exception.hpp:131
boost::thread * boost_thread
Definition: thread_d.hpp:92
std::vector< task_base * > task_sch_queue
Definition: thread_d.hpp:100
bool has_next_task()
Definition: thread_d.hpp:542
bool operator()(task_base *a, task_base *b)
Definition: thread_d.hpp:241
static time_point min()
Definition: time.hpp:49
static thread & current()
Definition: thread.cpp:125
int64_t count() const
Definition: time.hpp:28
void check_fiber_exceptions()
Definition: thread_d.hpp:358
unsigned non_preemptable_scope_count
Definition: thread_d.hpp:125
void run_next_task()
Definition: thread_d.hpp:529
#define ilog(FORMAT,...)
Definition: logger.hpp:117
task_base * cur_task
Definition: context.hpp:199
void clear_blocking_promises()
Definition: context.hpp:176
thread_d(fc::thread &s, thread_idle_notifier *n=0)
Definition: thread_d.hpp:35
boost::atomic< task_base * > task_in_queue
Definition: thread_d.hpp:97
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
Definition: exception.hpp:345
bc::fcontext_t my_context
Definition: context.hpp:182
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
Definition: exception.hpp:378
time_point check_for_timeouts()
Definition: thread_d.hpp:650
bco::protected_stack_allocator stack_allocator
Definition: context.hpp:29
task_base * _next
Definition: task.hpp:65
fc::context::ptr ready_pop_front()
Definition: thread_d.hpp:195
bool operator()(const task_base *a, const context *b) const
Definition: thread_d.hpp:219
void _set_active_context(context *)
Definition: task.cpp:87
std::string name
Definition: thread_d.hpp:105
static void start_process_tasks(intptr_t my)
Definition: thread_d.hpp:509
bool operator()(const context *a, const context *b) const
Definition: thread_d.hpp:231
bool operator()(const context *a, const task_base *b) const
Definition: thread_d.hpp:225
const microseconds & time_since_epoch() const
Definition: time.hpp:54
std::vector< detail::specific_data_info > non_task_specific_data
Definition: thread_d.hpp:119
void remove_blocking_promise(promise_base *p)
Definition: context.hpp:157
std::vector< blocked_promise > blocking_prom
Definition: context.hpp:187
void pt_push_back(fc::context *c)
Definition: thread_d.hpp:180
void cleanup_thread_specific_data()
Definition: thread_d.hpp:782
Definition: api.hpp:15
static time_point maximum()
Definition: time.hpp:48
boost::mutex task_ready_mutex
Definition: thread_d.hpp:95
fc::context * blocked
Definition: thread_d.hpp:112
uint64_t next_posted_num
Definition: thread_d.hpp:99
std::pair< thread_d *, fc::context * > context_pair
Definition: thread_d.hpp:33
void yield_until(const time_point &tp, bool reschedule)
Definition: thread_d.hpp:704
boost::condition_variable task_ready
Definition: thread_d.hpp:94
void add_context_to_ready_list(context *context_to_add, bool at_end=false)
Definition: thread_d.hpp:203
static time_point now()
Definition: time.cpp:13
fc::context * caller_context
Definition: context.hpp:183
static priority _internal__priority_for_short_sleeps()
Definition: priority.hpp:18
void unblock(fc::context *c)
Definition: thread_d.hpp:692
bool operator()(const context::ptr &a, const context::ptr &b)
Definition: thread_d.hpp:15
std::vector< fc::context * > ready_heap
Definition: thread_d.hpp:110
bool operator()(const task_base *a, const task_base *b) const
Definition: thread_d.hpp:213
thread_idle_notifier * notifier
Definition: thread_d.hpp:122
priority _prio
Definition: task.hpp:61
stack_allocator stack_alloc
Definition: thread_d.hpp:93
void run()
Definition: task.cpp:29
void move_newly_scheduled_tasks_to_task_pqueue()
Definition: thread_d.hpp:287
cerr_t & cerr
Definition: iostream.cpp:176
bool canceled
Definition: context.hpp:194