4 #include <boost/thread.hpp> 6 #include <boost/thread/condition_variable.hpp> 7 #include <boost/thread.hpp> 8 #include <boost/atomic.hpp> 36 :self(s), boost_thread(0),
43 next_unused_task_storage_slot(0),
46 ,non_preemptable_scope_count(0)
49 static boost::atomic<int> cnt(0);
50 name = std::string(
"th_") + char(
'a'+cnt++);
61 if (ready_context->cur_task)
64 ready_context->cur_task =
nullptr;
86 boost_thread->detach();
129 void debug(
const std::string& s ) {
133 fc::cerr<<
"--------------------- "<<s.c_str()<<
" - "<<current;
135 fc::cerr<<
" ---------------------------\n";
140 if( c->cur_task )
fc::cerr<<
'('<<c->cur_task->get_desc()<<
')';
153 if( c->cur_task )
fc::cerr<<
'('<<c->cur_task->get_desc()<<
')';
155 for( uint32_t i = 0; i < c->blocking_prom.size(); ++i ) {
156 fc::cerr<<c->blocking_prom[i].prom<<
'('<<c->blocking_prom[i].prom->get_desc()<<
')';
157 if( i + 1 < c->blocking_prom.size() ) {
170 fc::cerr<<
"-------------------------------------------------\n";
197 fc::context* highest_priority_context = ready_heap.front();
199 ready_heap.pop_back();
200 return highest_priority_context;
207 ready_heap.push_back(context_to_add);
256 unsigned num_ready_tasks = 0;
259 if (cur->
_when <= now)
265 next_posted_num += num_ready_tasks;
266 unsigned tasks_posted = 0;
269 if (cur->
_when > now)
271 task_sch_queue.push_back(cur);
272 std::push_heap(task_sch_queue.begin(),
277 cur->
_posted_num = next_posted_num - (++tasks_posted);
278 task_pqueue.push_back(cur);
279 std::push_heap(task_pqueue.begin(),
298 task_base* pending_list = task_in_queue.exchange(0, boost::memory_order_seq_cst);
300 enqueue(pending_list);
305 while (!task_sch_queue.empty() &&
308 task_base* ready_task = task_sch_queue.front();
309 std::pop_heap(task_sch_queue.begin(), task_sch_queue.end(),
task_when_less());
310 task_sch_queue.pop_back();
313 task_pqueue.push_back(ready_task);
323 assert(!task_pqueue.empty());
326 task_pqueue.pop_back();
332 bool canceled_task =
false;
333 for(
auto task_itr = task_sch_queue.begin();
334 task_itr != task_sch_queue.end();
337 if( (*task_itr)->canceled() )
340 (*task_itr)->release();
341 task_itr = task_sch_queue.erase(task_itr);
342 canceled_task =
true;
349 std::make_heap( task_sch_queue.begin(), task_sch_queue.end(),
task_when_less() );
351 return canceled_task;
370 ilog(
"throwing canceled exception" );
386 assert(non_preemptable_scope_count == 0);
397 std::stringstream stacktrace;
399 elog(
"Thread ${name} yielded in exception handler!\n${trace}",
404 check_for_timeouts();
411 if (!ready_heap.empty())
417 assert(next != current);
420 BOOST_ASSERT(next != current);
428 add_context_to_ready_list(prev,
true);
432 #if BOOST_VERSION >= 106100 434 auto t = bc::jump_fcontext( next->
my_context, &p );
435 static_cast<context_pair*
>(t.data)->second->my_context = t.fctx;
439 BOOST_ASSERT( current );
440 BOOST_ASSERT( current == prev );
454 pt_head = pt_head->
next;
469 add_context_to_ready_list(prev,
true);
474 #if BOOST_VERSION >= 106100 476 auto t = bc::jump_fcontext( next->
my_context, &p );
477 static_cast<context_pair*
>(t.data)->second->my_context = t.fctx;
481 BOOST_ASSERT( current );
482 BOOST_ASSERT( current == prev );
487 current->
prio = original_priority;
502 #if BOOST_VERSION >= 106100 503 static void start_process_tasks( bc::transfer_t my )
506 auto self =
static_cast<thread_d*
>(p->first);
507 p->second->my_context = my.fctx;
517 catch ( canceled_exception& ) { }
520 elog(
"fiber ${name} exited with uncaught exception: ${e}", (
"e",
fc::except_str())(
"name", self->name) );
525 self->free_list.push_back(self->current);
526 self->start_next_fiber(
false );
544 if( task_pqueue.size() ||
545 (task_sch_queue.size() && task_sch_queue.front()->_when <=
time_point::now()) ||
546 task_in_queue.load( boost::memory_order_relaxed ) )
553 for( uint32_t i = 0; i < free_list.size(); ++i )
560 while( !done || blocked )
563 move_newly_scheduled_tasks_to_task_pqueue();
566 check_for_timeouts();
568 if (!task_pqueue.empty())
570 if (!ready_heap.empty())
576 pt_push_back(current);
577 start_next_fiber(
false);
590 if (!ready_heap.empty())
592 pt_push_back( current );
593 start_next_fiber(
false);
597 if( process_canceled_tasks() )
603 boost::unique_lock<boost::mutex> lock(task_ready_mutex);
604 if( has_next_task() )
606 time_point timeout_time = check_for_timeouts();
612 if( task_in_queue.load(boost::memory_order_relaxed) )
616 task_ready.wait( lock );
639 task_ready.wait_until( lock, boost::chrono::steady_clock::now() +
652 if( !sleep_pqueue.size() && !task_sch_queue.size() )
659 if( !sleep_pqueue.empty() && next > sleep_pqueue.front()->resume_time )
660 next = sleep_pqueue.front()->resume_time;
661 if( !task_sch_queue.empty() && next > task_sch_queue.front()->_when )
662 next = task_sch_queue.front()->_when;
669 while( sleep_pqueue.size() && sleep_pqueue.front()->resume_time < now )
674 sleep_pqueue.pop_back();
686 add_context_to_ready_list(c);
696 self.async( [
this,c](){ unblock(c); },
"thread_d::unblock" );
701 add_context_to_ready_list(c);
705 check_fiber_exceptions();
711 "Attempting to yield while processing an exception");
719 sleep_pqueue.push_back(current);
720 std::push_heap( sleep_pqueue.begin(),
723 start_next_fiber(reschedule);
726 for( uint32_t i = 0; i < sleep_pqueue.size(); ++i )
728 if( sleep_pqueue[i] == current )
730 sleep_pqueue[i] = sleep_pqueue.back();
731 sleep_pqueue.pop_back();
732 std::make_heap( sleep_pqueue.begin(),
739 check_fiber_exceptions();
747 "Attempting to yield while processing an exception");
762 sleep_pqueue.push_back(current);
763 std::push_heap( sleep_pqueue.begin(),
769 add_to_blocked( current );
779 check_fiber_exceptions();
784 for (
auto iter = non_task_specific_data.begin(); iter != non_task_specific_data.end(); ++iter)
786 iter->cleanup(iter->value);
788 for (
auto iter = thread_specific_data.begin(); iter != thread_specific_data.end(); ++iter)
790 iter->cleanup(iter->value);
797 if ((*iter)->canceled)
801 add_context_to_ready_list(*iter);
802 *iter = next_blocked;
808 bool task_removed_from_sleep_pqueue =
false;
809 for (
auto sleep_iter = sleep_pqueue.begin(); sleep_iter != sleep_pqueue.end();)
811 if ((*sleep_iter)->canceled)
813 bool already_on_ready_list = std::find(ready_heap.begin(), ready_heap.end(),
814 *sleep_iter) != ready_heap.end();
815 if (!already_on_ready_list)
816 add_context_to_ready_list(*sleep_iter);
817 sleep_iter = sleep_pqueue.erase(sleep_iter);
818 task_removed_from_sleep_pqueue =
true;
823 if (task_removed_from_sleep_pqueue)
void wait(const promise_base::ptr &p, const time_point &timeout)
fc::context * next_blocked
uint64_t context_posted_num
bool operator()(const context *a, const task_base *b) const
void add_blocking_promise(promise_base *p, bool req=true)
void add_to_blocked(fc::context *c)
std::vector< task_base * > task_pqueue
std::shared_ptr< promise_base > ptr
std::vector< fc::context * > free_list
uint64_t _posted_num
Task priority looks like unsupported feature.
void timeout_blocking_promises()
const char * cancellation_reason
void print_stacktrace(std::ostream &out)
unsigned next_unused_task_storage_slot
std::vector< detail::specific_data_info > thread_specific_data
bool process_canceled_tasks()
void enqueue(task_base *t)
std::vector< fc::context * > sleep_pqueue
void notify_task_has_been_canceled()
bool start_next_fiber(bool reschedule=false)
std::shared_ptr< exception > exception_ptr
boost::thread * boost_thread
std::vector< task_base * > task_sch_queue
bool operator()(task_base *a, task_base *b)
static thread & current()
void check_fiber_exceptions()
const microseconds & time_since_epoch() const
unsigned non_preemptable_scope_count
void clear_blocking_promises()
thread_d(fc::thread &s, thread_idle_notifier *n=0)
bool operator()(const context *a, const context *b) const
boost::atomic< task_base * > task_in_queue
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
bc::fcontext_t my_context
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
bool operator()(const task_base *a, const task_base *b) const
time_point check_for_timeouts()
bco::protected_stack_allocator stack_allocator
fc::context::ptr ready_pop_front()
void _set_active_context(context *)
static void start_process_tasks(intptr_t my)
std::vector< detail::specific_data_info > non_task_specific_data
void remove_blocking_promise(promise_base *p)
std::vector< blocked_promise > blocking_prom
void pt_push_back(fc::context *c)
void cleanup_thread_specific_data()
const char * get_desc() const
static time_point maximum()
boost::mutex task_ready_mutex
std::pair< thread_d *, fc::context * > context_pair
void yield_until(const time_point &tp, bool reschedule)
boost::condition_variable task_ready
void add_context_to_ready_list(context *context_to_add, bool at_end=false)
fc::context * caller_context
static priority _internal__priority_for_short_sleeps()
void unblock(fc::context *c)
bool operator()(const context::ptr &a, const context::ptr &b)
std::vector< fc::context * > ready_heap
thread_idle_notifier * notifier
bool operator()(const task_base *a, const context *b) const
stack_allocator stack_alloc
void move_newly_scheduled_tasks_to_task_pqueue()