BitShares-Core  5.0.0
BitShares blockchain implementation and command-line interface software
parallel.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 The BitShares Blockchain, and contributors.
3  *
4  * The MIT License
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  */
24 
25 #include <fc/thread/parallel.hpp>
28 #include <fc/asio.hpp>
29 
30 #include <boost/atomic/atomic.hpp>
31 #include <boost/lockfree/queue.hpp>
32 
33 namespace fc {
34  namespace detail {
36  {
37  public:
39  {
40  is_idle.store(false);
41  }
42 
44  {
45  id = copy.id;
46  my_pool = copy.my_pool;
47  is_idle.store( copy.is_idle.load() );
48  }
49 
50  virtual ~idle_notifier_impl() {}
51 
52  virtual task_base* idle();
53  virtual void busy()
54  {
55  is_idle.store(false);
56  }
57 
58  uint32_t id;
60  boost::atomic<bool> is_idle;
61  };
62 
63  class pool_impl
64  {
65  public:
66  explicit pool_impl( const uint16_t num_threads )
67  : idle_threads( 2 * num_threads ), waiting_tasks( 200 )
68  {
69  notifiers.resize( num_threads );
70  threads.reserve( num_threads );
71  for( uint32_t i = 0; i < num_threads; i++ )
72  {
73  notifiers[i].id = i;
74  notifiers[i].my_pool = this;
75  threads.push_back( new thread( "pool worker " + fc::to_string(i), &notifiers[i] ) );
76  }
77  }
79  {
80  for( thread* t : threads)
81  delete t; // also calls quit()
82  waiting_tasks.consume_all( [] ( task_base* t ) {
83  t->cancel( "thread pool quitting" );
84  });
85  }
86 
88  {
89  idle_notifier_impl* ini;
90  while( idle_threads.pop( ini ) )
91  if( ini->is_idle.exchange( false ) )
92  { // minor race condition here, a thread might receive a task while it's busy
93  return threads[ini->id];
94  }
95  boost::unique_lock<fc::spin_yield_lock> lock(pool_lock);
96  while( idle_threads.pop( ini ) )
97  if( ini->is_idle.exchange( false ) )
98  { // minor race condition here, a thread might receive a task while it's busy
99  return threads[ini->id];
100  }
101  while( !waiting_tasks.push( task ) )
102  elog( "Worker pool internal error" );
103  return 0;
104  }
105 
107  {
108  task_base* task;
109  if( waiting_tasks.pop( task ) )
110  return task;
111  fc::unique_lock<fc::spin_yield_lock> lock(pool_lock);
112  if( waiting_tasks.pop( task ) )
113  return task;
114  while( !idle_threads.push( ini ) )
115  elog( "Worker pool internal error" );
116  return 0;
117  }
118  private:
119  std::vector<idle_notifier_impl> notifiers;
120  std::vector<thread*> threads;
121  boost::lockfree::queue<idle_notifier_impl*> idle_threads;
122  boost::lockfree::queue<task_base*> waiting_tasks;
123  fc::spin_yield_lock pool_lock;
124  };
125 
127  {
128  is_idle.store( true );
129  task_base* result = my_pool->enqueue_idle_thread( this );
130  if( result ) is_idle.store( false );
131  return result;
132  }
133 
135  {
138  }
139 
141  {
142  delete my;
143  }
144 
146  {
147  thread* worker = my->post( task );
148  if( worker )
149  worker->async_task( task, priority() );
150  }
151 
153  {
154  static worker_pool the_pool;
155  return the_pool;
156  }
157  }
158 
159  serial_valve::ticket_guard::ticket_guard( boost::atomic<future<void>*>& latch )
160  {
161  my_promise = promise<void>::create();
162  future<void>* my_future = new future<void>( my_promise );
163  try
164  {
165  do
166  {
167  ticket = latch.load();
168  FC_ASSERT( ticket, "Valve is shutting down!" );
169  }
170  while( !latch.compare_exchange_weak( ticket, my_future ) );
171  }
172  catch (...)
173  {
174  delete my_future;
175  throw;
176  }
177  }
178 
179  serial_valve::ticket_guard::~ticket_guard()
180  {
181  my_promise->set_value();
182  ticket->wait();
183  delete ticket;
184  }
185 
186  void serial_valve::ticket_guard::wait_for_my_turn()
187  {
188  ticket->wait();
189  }
190 
192  {
193  latch.store( new future<void>( promise<void>::create( true ) ) );
194  }
195 
197  {
198  fc::future<void>* last = latch.exchange( 0 );
199  last->wait();
200  delete last;
201  }
202 } // namespace fc
static ptr create(const char *desc FC_TASK_NAME_DEFAULT_ARG)
Definition: future.hpp:114
virtual void cancel(const char *reason FC_CANCELATION_REASON_DEFAULT_ARG) override
Definition: task.cpp:64
boost::atomic< bool > is_idle
Definition: parallel.cpp:60
#define elog(FORMAT,...)
Definition: logger.hpp:129
idle_notifier_impl(const idle_notifier_impl &copy)
Definition: parallel.cpp:43
void wait(const microseconds &timeout=microseconds::maximum())
Definition: future.hpp:299
virtual task_base * idle()
Definition: parallel.cpp:126
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
Definition: exception.hpp:345
task_base * enqueue_idle_thread(idle_notifier_impl *ini)
Definition: parallel.cpp:106
static uint16_t get_num_threads()
Definition: asio.cpp:108
at< List, length< List >()-1 > last
Get the type at the end of the list.
Definition: typelist.hpp:193
pool_impl(const uint16_t num_threads)
Definition: parallel.cpp:66
std::string to_string(double)
Definition: string.cpp:73
boost::asio::io_service & default_io_service()
Definition: asio.cpp:182
Definition: api.hpp:15
void copy(const path &from, const path &to)
Definition: filesystem.cpp:241
modified spin-lock that yields on failure, but becomes a &#39;spin lock&#39; if there are no other tasks to y...
worker_pool & get_worker_pool()
Definition: parallel.cpp:152
thread * post(task_base *task)
Definition: parallel.cpp:87
void post(task_base *task)
Definition: parallel.cpp:145