8 #if defined(_MSC_VER) && !defined(NDEBUG) 10 const DWORD MS_VC_EXCEPTION=0x406D1388;
13 typedef struct tagTHREADNAME_INFO
22 static void set_thread_name(
const char* threadName)
26 info.szName = threadName;
32 RaiseException(MS_VC_EXCEPTION, 0,
sizeof(info)/
sizeof(ULONG_PTR), (ULONG_PTR*)&info);
34 __except(EXCEPTION_EXECUTE_HANDLER)
38 #elif defined(__linux__) 40 static void set_thread_name(
const char* threadName)
42 pthread_setname_np(pthread_self(), threadName);
44 #elif defined(__APPLE__) && !defined(NDEBUG) 46 static void set_thread_name(
const char* threadName)
48 pthread_setname_np(threadName);
51 static void set_thread_name(
const char* threadName)
59 return thread::current().name().c_str();
62 return &thread::current();
69 static __thread
thread* t = NULL;
76 boost::thread* t =
new boost::thread( [
this,p,name,notifier]() {
78 set_thread_name(name.c_str());
79 this->my =
new thread_d( *
this, notifier );
87 wlog(
"unhandled exception" );
92 std::cerr <<
"unhandled exception in thread '" << name <<
"'\n";
98 wlog(
"unhandled exception" );
99 p->
set_exception( std::make_shared<unhandled_exception>(
FC_LOG_MESSAGE( warn,
"unhandled exception: ${diagnostic}", (
"diagnostic",boost::current_exception_diagnostic_information()) ) ) );
103 std::cerr <<
"unhandled exception in thread '" << name <<
"'\n";
104 std::cerr << boost::current_exception_diagnostic_information() <<
"\n";
109 my->boost_thread = t;
117 if( my && is_running() )
131 void thread::cleanup() {
138 const string& thread::name()
const 143 void thread::set_name(
const std::string& n )
147 async([
this,n](){ set_name(n); },
"set_name").
wait();
151 set_thread_name(my->name.c_str());
154 const char* thread::current_task_desc()
const 156 if (my->current && my->current->cur_task)
157 return my->current->cur_task->get_desc();
161 void thread::debug(
const std::string& d ) { }
163 #if defined(__linux__) || defined(__APPLE__) 169 #if defined(__linux__) || defined(__APPLE__) 170 pthread_kill( my->boost_thread->native_handle(), sig );
180 auto t = my->boost_thread;
181 async( [
this](){quit();},
"thread::quit" );
205 BOOST_ASSERT( my->blocked == 0 );
207 for (
task_base* unstarted_task : my->task_pqueue)
208 unstarted_task->
set_exception(std::make_shared<canceled_exception>(
FC_LOG_MESSAGE(error,
"cancellation reason: thread quitting")));
209 my->task_pqueue.clear();
211 for (
task_base* scheduled_task : my->task_sch_queue)
212 scheduled_task->
set_exception(std::make_shared<canceled_exception>(
FC_LOG_MESSAGE(error,
"cancellation reason: thread quitting")));
213 my->task_sch_queue.clear();
218 for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i )
219 my->add_context_to_ready_list( my->sleep_pqueue[i] );
220 my->sleep_pqueue.clear();
228 my->add_context_to_ready_list( cur );
238 while (!my->ready_heap.empty())
240 my->start_next_fiber(
true);
241 my->check_for_timeouts();
243 my->clear_free_list();
244 my->cleanup_thread_specific_data();
256 catch( canceled_exception& e )
258 dlog(
"thread canceled: ${e}", (
"e", e.to_detail_string()) );
264 bool thread::is_running()
const 273 return my->current->prio;
279 my->check_fiber_exceptions();
280 my->start_next_fiber(reschedule);
281 my->check_fiber_exceptions();
288 my->yield_until( tp,
false );
292 for(
size_t i = 0; i < p.size(); ++i )
296 if( timeout < time_point::now() )
299 for(
auto i = p.begin(); i != p.end(); ++i )
300 ss << (*i)->get_desc() <<
", ";
308 for( uint32_t i = 0; i < p.size(); ++i )
309 my->current->add_blocking_promise(p[i].get(),
false);
312 if( timeout != time_point::maximum() )
314 my->current->resume_time = timeout;
315 my->sleep_pqueue.push_back(my->current);
316 std::push_heap( my->sleep_pqueue.begin(),
317 my->sleep_pqueue.end(),
321 my->add_to_blocked( my->current );
322 my->start_next_fiber();
324 for(
auto i = p.begin(); i != p.end(); ++i )
325 my->current->remove_blocking_promise(i->get());
327 my->check_fiber_exceptions();
329 for( uint32_t i = 0; i < p.size(); ++i )
337 async_task( t, p, time_point::min() );
340 void thread::poke() {
341 boost::unique_lock<boost::mutex> lock(my->task_ready_mutex);
342 my->task_ready.notify_one();
352 task_base* stale_head = my->task_in_queue.load(boost::memory_order_relaxed);
353 do { t->
_next = stale_head;
354 }
while( !my->task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) );
359 if(
this != ¤t() && !stale_head ) {
360 boost::unique_lock<boost::mutex> lock(my->task_ready_mutex);
361 my->task_ready.notify_one();
366 thread::current().yield();
369 thread::current().sleep_until( time_point::now() + u);
372 thread::current().sleep_until(tp);
377 return thread::current().exec();
382 return thread::current().wait_any_until( std::move(v), time_point::now() + timeout_us );
387 return thread::current().wait_any_until( std::move(v), tp );
395 if( timeout < time_point::now() )
401 my->current->add_blocking_promise(p.get(),
true);
404 if( timeout != time_point::maximum() )
406 my->current->resume_time = timeout;
407 my->sleep_pqueue.push_back(my->current);
408 std::push_heap( my->sleep_pqueue.begin(),
409 my->sleep_pqueue.end(),
413 my->add_to_blocked( my->current );
415 my->start_next_fiber();
417 my->current->remove_blocking_promise(p.get());
419 my->check_fiber_exceptions();
424 BOOST_ASSERT(p->ready());
427 this->
async( [=](){ notify(p); },
"notify", priority::max() );
446 for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i )
448 if( my->sleep_pqueue[i] == cur_blocked )
451 my->sleep_pqueue[i] = my->sleep_pqueue.back();
452 my->sleep_pqueue.pop_back();
457 auto cur = cur_blocked;
466 cur_blocked = my->blocked;
469 my->add_context_to_ready_list( cur );
473 prev_blocked = cur_blocked;
479 bool thread::is_current()
const 481 return this == ¤t();
484 void thread::notify_task_has_been_canceled()
486 async( [
this](){ my->notify_task_has_been_canceled(); },
"notify_task_has_been_canceled", priority::max() );
495 idle_guard::idle_guard(
thread_d* t ) : notifier(t->notifier)
504 work->
_next = stale_head;
505 }
while( !t->
task_in_queue.compare_exchange_weak( stale_head, work, boost::memory_order_release ) );
511 if( notifier ) notifier->
busy();
519 unhandled_exception_filter_type unhandled_structured_exception_filter =
nullptr;
521 void set_unhandled_structured_exception_filter(unhandled_exception_filter_type new_filter)
523 detail::unhandled_structured_exception_filter = new_filter;
525 unhandled_exception_filter_type get_unhandled_structured_exception_filter()
527 return detail::unhandled_structured_exception_filter;
void sleep_until(const time_point &tp)
virtual task_base * idle()=0
auto async(Functor &&f, const char *desc FC_TASK_NAME_DEFAULT_ARG, priority prio=priority()) -> fc::future< decltype(f())>
fc::context * next_blocked
const char * thread_name()
void set_exception(const fc::exception_ptr &e)
T wait(boost::signals2::signal< void(T)> &sig, const microseconds &timeout_us=microseconds::maximum())
std::shared_ptr< promise_base > ptr
Used to generate a useful error report when an exception is thrown.At each level in the stack where t...
void set_exception_on_blocking_promises(const exception_ptr &e)
const string & name() const
returns the name given by set_name() for this thread
void usleep(const microseconds &u)
bool try_unblock(promise_base *p)
const T & wait(const microseconds &timeout=microseconds::maximum())
static thread & current()
void set_value(const T &v)
boost::atomic< task_base * > task_in_queue
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
int wait_any(const fc::future< T1 > &f1, const fc::future< T2 > &f2, const microseconds timeout_us=microseconds::maximum())
std::string to_detail_string(log_level ll=log_level::all) const
std::vector< blocked_promise > blocking_prom
int wait_any_until(std::vector< promise_base::ptr > &&v, const time_point &tp)
thread *& current_thread()
boost::signals2::signal< T > signal
#define FC_LOG_MESSAGE(LOG_LEVEL, FORMAT,...)
A helper method for generating log messages.
virtual std::shared_ptr< exception > dynamic_copy_exception() const
std::shared_ptr< promise< T > > ptr