quickpool  1.6.0
An easy-to-use, header-only work stealing thread pool in C++11
quickpool.hpp
1 // Copyright 2021 Thomas Nagler (MIT License)
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a copy
4 // of this software and associated documentation files (the "Software"), to deal
5 // in the Software without restriction, including without limitation the rights
6 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7 // copies of the Software, and to permit persons to whom the Software is
8 // furnished to do so, subject to the following conditions:
9 
10 // The above copyright notice and this permission notice shall be included in
11 // all copies or substantial portions of the Software.
12 
13 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19 // SOFTWARE.
20 
21 #pragma once
22 
23 #include <algorithm>
24 #include <atomic>
25 #include <condition_variable>
26 #include <exception>
27 #include <functional>
28 #include <future>
29 #include <memory>
30 #include <mutex>
31 #include <numeric>
32 #include <thread>
33 #include <vector>
34 
35 #if (defined __linux__ || defined AFFINITY)
36 #include <pthread.h>
37 #endif
38 
39 // Layout of quickpool.hpp
40 //
41 // 1. Memory related utilities.
42 // - Memory order aliases
43 // - Class for padding bytes
44 // - Memory aligned allocation utilities
45 // - Class for cache aligned atomics
46 // - Class for load/assign atomics with relaxed order
47 // 2. Loop related utilities.
48 // - Worker class for parallel for loops
49 // 3. Scheduling utilities.
50 // - Ring buffer
51 // - Task queue
52 // - Task manager
53 // 4. Thread pool class
54 // 5. Free-standing functions (main API)
55 
57 namespace quickpool {
58 
59 // 1. --------------------------------------------------------------------------
60 
62 namespace mem {
63 
65 static constexpr std::memory_order relaxed = std::memory_order_relaxed;
66 static constexpr std::memory_order acquire = std::memory_order_acquire;
67 static constexpr std::memory_order release = std::memory_order_release;
68 static constexpr std::memory_order seq_cst = std::memory_order_seq_cst;
69 
74 namespace padding_impl {
75 
77 constexpr size_t
78 mod(size_t a, size_t b)
79 {
80  return a - b * (a / b);
81 }
82 
83 // Padding bytes from end of aligned object until next alignment point. char[]
84 // must hold at least one byte.
85 template<class T, size_t Align>
86 struct padding_bytes
87 {
88  static constexpr size_t free_space =
89  Align - mod(sizeof(std::atomic<T>), Align);
90  static constexpr size_t required = free_space > 1 ? free_space : 1;
91  char padding_[required];
92 };
93 
94 struct empty_struct
95 {};
96 
99 template<class T, size_t Align>
100 struct padding
101  : std::conditional<mod(sizeof(std::atomic<T>), Align) != 0,
102  padding_bytes<T, Align>,
103  empty_struct>::type
104 {};
105 
106 } // end namespace padding_impl
107 
108 namespace aligned {
109 
110 // The alloc/dealloc mechanism is pretty much
111 // https://www.boost.org/doc/libs/1_76_0/boost/align/detail/aligned_alloc.hpp
112 
113 inline void*
114 alloc(size_t alignment, size_t size) noexcept
115 {
116  // Make sure alignment is at least that of void*.
117  alignment = (alignment >= alignof(void*)) ? alignment : alignof(void*);
118 
119  // Allocate enough space required for object and a void*.
120  size_t space = size + alignment + sizeof(void*);
121  void* p = std::malloc(space);
122  if (p == nullptr) {
123  return nullptr;
124  }
125 
126  // Shift pointer to leave space for void*.
127  void* p_algn = static_cast<char*>(p) + sizeof(void*);
128  space -= sizeof(void*);
129 
130  // Shift pointer further to ensure proper alignment.
131  (void)std::align(alignment, size, p_algn, space);
132 
133  // Store unaligned pointer with offset sizeof(void*) before aligned
134  // location. Later we'll know where to look for the pointer telling
135  // us where to free what we malloc()'ed above.
136  *(static_cast<void**>(p_algn) - 1) = p;
137 
138  return p_algn;
139 }
140 
141 inline void
142 free(void* ptr) noexcept
143 {
144  if (ptr) {
145  std::free(*(static_cast<void**>(ptr) - 1));
146  }
147 }
148 
151 template<class T, std::size_t Alignment = 64>
152 class allocator : public std::allocator<T>
153 {
154  private:
155  static constexpr size_t min_align =
156  (Alignment >= alignof(void*)) ? Alignment : alignof(void*);
157 
158  public:
159  template<class U>
160  struct rebind
161  {
162  typedef allocator<U, Alignment> other;
163  };
164 
165  allocator() noexcept
166  : std::allocator<T>()
167  {}
168 
169  template<class U, std::size_t UAlignment>
170  allocator(const allocator<U, UAlignment>& other) noexcept
171  : std::allocator<T>(other)
172  {}
173 
174  T* allocate(size_t size, const void* = 0)
175  {
176  if (size == 0) {
177  return 0;
178  }
179  void* p = mem::aligned::alloc(min_align, sizeof(T) * size);
180  if (!p) {
181  throw std::bad_alloc();
182  }
183  return static_cast<T*>(p);
184  }
185 
186  void deallocate(T* ptr, size_t) { mem::aligned::free(ptr); }
187 
188  template<class U, class... Args>
189  void construct(U* ptr, Args&&... args)
190  {
191  ::new (static_cast<void *>(ptr)) U(std::forward<Args>(args)...);
192  }
193 
194  template <class U>
195  void destroy(U *ptr) noexcept
196  {
197  (void)ptr;
198  ptr->~U();
199  }
200 };
201 
205 template<class T, size_t Align = 64>
206 struct alignas(Align) atomic
207  : public std::atomic<T>
208  , private padding_impl::padding<T, Align>
209 {
210  public:
211  atomic() noexcept = default;
212 
213  atomic(T desired) noexcept
214  : std::atomic<T>(desired)
215  {}
216 
217  // Assignment operators have been deleted, must redefine.
218  T operator=(T x) noexcept { return std::atomic<T>::operator=(x); }
219  T operator=(T x) volatile noexcept { return std::atomic<T>::operator=(x); }
220 
221  static void* operator new(size_t count) noexcept
222  {
223  return mem::aligned::alloc(Align, count);
224  }
225 
226  static void operator delete(void* ptr) { mem::aligned::free(ptr); }
227 };
228 
230 template<typename T>
231 struct relaxed_atomic : public mem::aligned::atomic<T>
232 {
233  explicit relaxed_atomic(T value)
234  : mem::aligned::atomic<T>(value)
235  {}
236 
237  operator T() const noexcept { return this->load(mem::relaxed); }
238 
239  T operator=(T desired) noexcept
240  {
241  this->store(desired, mem::relaxed);
242  return desired;
243  }
244 };
245 
247 template<class T, size_t Alignment = 64>
248 using vector = std::vector<T, mem::aligned::allocator<T, Alignment>>;
249 
250 } // end namespace aligned
251 
252 } // end namespace mem
253 
254 // 2. --------------------------------------------------------------------------
255 
257 namespace loop {
258 
260 struct State
261 {
262  int pos;
263  int end;
264 };
265 
276 template<typename Function>
277 struct Worker
278 {
279  Worker() {}
280  Worker(int begin, int end, Function fun)
281  : state{ State{ begin, end } }
282  , f{ fun }
283  {}
284 
285  Worker(Worker&& other)
286  : state{ other.state.load() }
287  , f{ std::forward<Function>(other.f) }
288  {}
289 
290  size_t tasks_left() const
291  {
292  State s = state.load();
293  return s.end - s.pos;
294  }
295 
296  bool done() const { return (tasks_left() == 0); }
297 
299  void run(std::shared_ptr<mem::aligned::vector<Worker>> others)
300  {
301  State s, s_old; // temporary state variables
302  do {
303  s = state.load();
304  if (s.pos < s.end) {
305  // Protect slot by trying to advance position before doing
306  // work.
307  s_old = s;
308  s.pos++;
309 
310  // Another worker might have changed the end of the range in
311  // the meanwhile. Check atomically if the state is unaltered
312  // and, if so, replace by advanced state.
313  if (state.compare_exchange_weak(s_old, s)) {
314  f(s_old.pos); // succeeded, do work
315  } else {
316  continue; // failed, try again
317  }
318  }
319  if (s.pos == s.end) {
320  // Reached end of own range, steal range from others. Range
321  // remains empty if all work is done, so we can leave the
322  // loop.
323  this->steal_range(*others);
324  }
325  } while (!this->done());
326  }
327 
329  void steal_range(mem::aligned::vector<Worker>& workers)
330  {
331  do {
332  Worker& other = find_victim(workers);
333  State s = other.state.load();
334  if (s.pos >= s.end) {
335  continue; // other range is empty by now
336  }
337 
338  // Remove second half of the range. Check atomically if the
339  // state is unaltered and, if so, replace with reduced range.
340  auto s_old = s;
341  s.end -= (s.end - s.pos + 1) / 2;
342  if (other.state.compare_exchange_weak(s_old, s)) {
343  // succeeded, update own range
344  state = State{ s.end, s_old.end };
345  break;
346  }
347  } while (!all_done(workers)); // failed steal, try again
348  }
349 
351  bool all_done(const mem::aligned::vector<Worker>& workers)
352  {
353  for (const auto& worker : workers) {
354  if (!worker.done())
355  return false;
356  }
357  return true;
358  }
359 
363  Worker& find_victim(mem::aligned::vector<Worker>& workers)
364  {
365  std::vector<size_t> tasks_left;
366  tasks_left.reserve(workers.size());
367  for (const auto& worker : workers) {
368  tasks_left.push_back(worker.tasks_left());
369  }
370  auto max_it = std::max_element(tasks_left.begin(), tasks_left.end());
371  auto idx = std::distance(tasks_left.begin(), max_it);
372  return workers[idx];
373  }
374 
375  mem::aligned::relaxed_atomic<State> state;
376  Function f; //< function applied to the loop index
377 };
378 
382 template<typename Function>
383 std::shared_ptr<mem::aligned::vector<Worker<Function>>>
384 create_workers(const Function& f, int begin, int end, size_t num_workers)
385 {
386  auto num_tasks = std::max(end - begin, static_cast<int>(0));
387  num_workers = std::max(num_workers, static_cast<size_t>(1));
388  auto workers = new mem::aligned::vector<Worker<Function>>;
389  workers->reserve(num_workers);
390  for (size_t i = 0; i < num_workers; i++) {
391  workers->emplace_back(begin + num_tasks * i / num_workers,
392  begin + num_tasks * (i + 1) / num_workers,
393  f);
394  }
395  return std::shared_ptr<mem::aligned::vector<Worker<Function>>>(
396  std::move(workers));
397 }
398 
399 } // end namespace loop
400 
401 // 3. -------------------------------------------------------------------------
402 
404 namespace sched {
405 
407 template<typename T>
408 class RingBuffer
409 {
410  public:
411  explicit RingBuffer(size_t capacity)
412  : buffer_{ std::unique_ptr<T[]>(new T[capacity]) }
413  , capacity_{ capacity }
414  , mask_{ capacity - 1 }
415  {}
416 
417  size_t capacity() const { return capacity_; }
418 
419  void set_entry(size_t i, T val) { buffer_[i & mask_] = val; }
420 
421  T get_entry(size_t i) const { return buffer_[i & mask_]; }
422 
423  RingBuffer<T>* enlarged_copy(size_t bottom, size_t top) const
424  {
425  RingBuffer<T>* new_buffer = new RingBuffer{ 2 * capacity_ };
426  for (size_t i = top; i != bottom; ++i)
427  new_buffer->set_entry(i, this->get_entry(i));
428  return new_buffer;
429  }
430 
431  private:
432  std::unique_ptr<T[]> buffer_;
433  size_t capacity_;
434  size_t mask_;
435 };
436 
438 class TaskQueue
439 {
440  using Task = std::function<void()>;
441 
442  public:
444  TaskQueue(size_t capacity = 256)
445  : buffer_{ new RingBuffer<Task*>(capacity) }
446  {}
447 
448  ~TaskQueue() noexcept
449  {
450  // Must free memory allocated by push(), but not freed by try_pop().
451  auto buf_ptr = buffer_.load();
452  for (int i = top_; i < bottom_.load(mem::relaxed); ++i)
453  delete buf_ptr->get_entry(i);
454  delete buf_ptr;
455  }
456 
457  TaskQueue(TaskQueue const& other) = delete;
458  TaskQueue& operator=(TaskQueue const& other) = delete;
459 
461  bool empty() const
462  {
463  return (bottom_.load(mem::relaxed) <= top_.load(mem::relaxed));
464  }
465 
468  void push(Task&& task)
469  {
470  // Must hold lock in case of multiple producers.
471  std::unique_lock<std::mutex> lk(mutex_);
472  auto b = bottom_.load(mem::relaxed);
473  auto t = top_.load(mem::acquire);
474  RingBuffer<Task*>* buf_ptr = buffer_.load(mem::relaxed);
475 
476  if (static_cast<int>(buf_ptr->capacity()) < (b - t) + 1) {
477  // Buffer is full, create enlarged copy before continuing.
478  auto old_buf = buf_ptr;
479  buf_ptr = std::move(buf_ptr->enlarged_copy(b, t));
480  old_buffers_.emplace_back(old_buf);
481  buffer_.store(buf_ptr, mem::relaxed);
482  }
483 
485  buf_ptr->set_entry(b, new Task{ std::forward<Task>(task) });
486  bottom_.store(b + 1, mem::release);
487 
488  lk.unlock(); // can release before signal
489  cv_.notify_one();
490  }
491 
493  bool try_pop(Task& task)
494  {
495  auto t = top_.load(mem::acquire);
496  std::atomic_thread_fence(mem::seq_cst);
497  auto b = bottom_.load(mem::acquire);
498 
499  if (t < b) {
500  // Must load task pointer before acquiring the slot, because it
501  // could be overwritten immediately after.
502  auto task_ptr = buffer_.load(mem::acquire)->get_entry(t);
503 
504  // Atomically try to advance top.
505  if (top_.compare_exchange_strong(
506  t, t + 1, mem::seq_cst, mem::relaxed)) {
507  task = std::move(*task_ptr); // won race, get task
508  delete task_ptr; // fre memory allocated in push()
509  return true;
510  }
511  }
512  return false; // queue is empty or lost race
513  }
514 
516  void wait()
517  {
518  std::unique_lock<std::mutex> lk(mutex_);
519  cv_.wait(lk, [this] { return !this->empty() || stopped_; });
520  }
521 
523  void stop()
524  {
525  {
526  std::lock_guard<std::mutex> lk(mutex_);
527  stopped_ = true;
528  }
529  cv_.notify_one();
530  }
531 
532  void wake_up()
533  {
534  {
535  std::lock_guard<std::mutex> lk(mutex_);
536  }
537  cv_.notify_one();
538  }
539 
540  private:
542  mem::aligned::atomic<int> top_{ 0 };
543  mem::aligned::atomic<int> bottom_{ 0 };
544 
546  std::atomic<RingBuffer<Task*>*> buffer_{ nullptr };
547 
549  std::vector<std::unique_ptr<RingBuffer<Task*>>> old_buffers_;
550 
552  std::mutex mutex_;
553  std::condition_variable cv_;
554  bool stopped_{ false };
555 };
556 
558 class TaskManager
559 {
560  public:
561  explicit TaskManager(size_t num_queues)
562  : queues_(num_queues)
563  , num_queues_(num_queues)
564  , owner_id_(std::this_thread::get_id())
565  {}
566 
567  TaskManager& operator=(TaskManager&& other)
568  {
569  std::swap(queues_, other.queues_);
570  num_queues_ = other.num_queues_;
571  status_ = other.status_.load();
572  num_waiting_ = other.num_waiting_.load();
573  push_idx_ = other.push_idx_.load();
574  todo_ = other.todo_.load();
575  return *this;
576  }
577 
578  void resize(size_t num_queues)
579  {
580  num_queues_ = std::max(num_queues, static_cast<size_t>(1));
581  if (num_queues > queues_.size()) {
582  queues_ = mem::aligned::vector<TaskQueue>(num_queues);
583  // thread pool must have stopped the manager, reset
584  num_waiting_ = 0;
585  todo_ = 0;
586  status_ = Status::running;
587  }
588  }
589 
590  template<typename Task>
591  void push(Task&& task)
592  {
593  rethrow_exception(); // push() throws if a task has errored.
594  if (is_running()) {
595  todo_.fetch_add(1, mem::release);
596  queues_[push_idx_++ % num_queues_].push(task);
597  }
598  }
599 
600  template<typename Task>
601  bool try_pop(Task& task, size_t worker_id = 0)
602  {
603  // Always start pop cycle at own queue to avoid contention.
604  for (size_t k = 0; k <= num_queues_; k++) {
605  if (queues_[(worker_id + k) % num_queues_].try_pop(task)) {
606  if (is_running()) {
607  return true;
608  } else {
609  // Throw away task if pool has stopped or errored.
610  return false;
611  }
612  }
613  }
614 
615  return false;
616  }
617 
618  void wake_up_all_workers()
619  {
620  for (auto& q : queues_)
621  q.wake_up();
622  }
623 
624  void wait_for_jobs(size_t id)
625  {
626  if (has_errored()) {
627  // Main thread may be waiting to reset the pool.
628  std::lock_guard<std::mutex> lk(mtx_);
629  if (++num_waiting_ == queues_.size())
630  cv_.notify_all();
631  } else {
632  ++num_waiting_;
633  }
634 
635  queues_[id].wait();
636  --num_waiting_;
637  }
638 
640  void wait_for_finish(size_t millis = 0)
641  {
642  if (called_from_owner_thread() && is_running()) {
643  auto wake_up = [this] { return (todo_ <= 0) || !is_running(); };
644  std::unique_lock<std::mutex> lk(mtx_);
645  if (millis == 0) {
646  cv_.wait(lk, wake_up);
647  } else {
648  cv_.wait_for(lk, std::chrono::milliseconds(millis), wake_up);
649  }
650  }
651  rethrow_exception();
652  }
653 
654  bool called_from_owner_thread() const
655  {
656  return (std::this_thread::get_id() == owner_id_);
657  }
658 
659  void report_success()
660  {
661 
662  auto n = todo_.fetch_sub(1, mem::release) - 1;
663  if (n == 0) {
664  // all jobs are done; lock before signal to prevent spurious
665  // failure
666  {
667  std::lock_guard<std::mutex> lk{ mtx_ };
668  }
669  cv_.notify_all();
670  }
671  }
672 
673  void report_fail(std::exception_ptr err_ptr)
674  {
675  std::lock_guard<std::mutex> lk(mtx_);
676  if (has_errored()) // only catch first exception
677  return;
678  err_ptr_ = err_ptr;
679  status_ = Status::errored;
680 
681  // Some threads may change todo_ after we stop. The large
682  // negative number forces them to exit the processing loop.
683  todo_.store(std::numeric_limits<int>::min() / 2);
684  cv_.notify_all();
685  }
686 
687  void stop()
688  {
689  {
690  std::lock_guard<std::mutex> lk(mtx_);
691  status_ = Status::stopped;
692  }
693  // Worker threads wait on queue-specific mutex -> notify all queues.
694  for (auto& q : queues_)
695  q.stop();
696  }
697 
698  void rethrow_exception()
699  {
700  // Exceptions are only thrown from the owner thread, not in workers.
701  if (called_from_owner_thread() && has_errored()) {
702  {
703  // Wait for all threads to idle so we can clean up after
704  // them.
705  std::unique_lock<std::mutex> lk(mtx_);
706  cv_.wait(lk, [this] { return num_waiting_ == queues_.size(); });
707  }
708  // Before throwing: restore defaults for potential future use of
709  // the task manager.
710  todo_ = 0;
711  auto current_exception = err_ptr_;
712  err_ptr_ = nullptr;
713  status_ = Status::running;
714 
715  std::rethrow_exception(current_exception);
716  }
717  }
718 
719  bool is_running() const
720  {
721  return status_.load(mem::relaxed) == Status::running;
722  }
723 
724  bool has_errored() const
725  {
726  return status_.load(mem::relaxed) == Status::errored;
727  }
728 
729  bool stopped() const
730  {
731  return status_.load(mem::relaxed) == Status::stopped;
732  }
733 
734  bool done() const { return (todo_.load(mem::relaxed) <= 0); }
735 
736  private:
738  mem::aligned::vector<TaskQueue> queues_;
739  size_t num_queues_;
740 
742  mem::aligned::relaxed_atomic<size_t> num_waiting_{ 0 };
743  mem::aligned::relaxed_atomic<size_t> push_idx_{ 0 };
744  mem::aligned::atomic<int> todo_{ 0 };
745 
747  const std::thread::id owner_id_;
748  enum class Status
749  {
750  running,
751  errored,
752  stopped
753  };
754  mem::aligned::atomic<Status> status_{ Status::running };
755  std::mutex mtx_;
756  std::condition_variable cv_;
757  std::exception_ptr err_ptr_{ nullptr };
758 };
759 
760 // find out which cores are allowed for use by pthread
761 inline std::vector<size_t>
762 get_avail_cores()
763 {
764  auto ncores = std::thread::hardware_concurrency();
765  std::vector<size_t> avail_cores;
766  avail_cores.reserve(ncores);
767 #if (defined __linux__)
768  cpu_set_t cpuset;
769  int rc = pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
770  if (rc != 0) {
771  throw std::runtime_error("Error calling pthread_getaffinity_np");
772  }
773  for (size_t id = 0; id < ncores; id++) {
774  if (CPU_ISSET(id, &cpuset)) {
775  avail_cores.push_back(id);
776  }
777  }
778 #endif
779  return avail_cores;
780 }
781 
782 inline size_t
783 num_cores_avail()
784 {
785 #if (defined __linux__)
786  return get_avail_cores().size();
787 #endif
788  return std::thread::hardware_concurrency();
789 }
790 
791 } // end namespace sched
792 
793 // 4. ------------------------------------------------------------------------
794 
797 {
798  public:
802  explicit ThreadPool(size_t threads = sched::num_cores_avail())
803  : task_manager_{ threads }
804  {
805  set_active_threads(threads);
806  }
807 
808  ~ThreadPool()
809  {
810  task_manager_.stop();
811  join_threads();
812  }
813 
814  ThreadPool(ThreadPool&&) = delete;
815  ThreadPool(const ThreadPool&) = delete;
816  ThreadPool& operator=(const ThreadPool&) = delete;
817  ThreadPool& operator=(ThreadPool&& other) = delete;
818 
821  {
822 #ifdef _WIN32
823  // Must leak resource, because windows + R deadlock otherwise.
824  // Memory is released on shutdown.
825  static auto ptr = new ThreadPool;
826  return *ptr;
827 #else
828  static ThreadPool instance_;
829  return instance_;
830 #endif
831  }
832 
836  void set_active_threads(size_t threads)
837  {
838  if (!task_manager_.called_from_owner_thread())
839  return;
840 
841  if (threads <= workers_.size()) {
842  task_manager_.resize(threads);
843  } else {
844  if (workers_.size() > 0) {
845  task_manager_.stop();
846  join_threads();
847  }
848  workers_ = std::vector<std::thread>{ threads };
849  task_manager_ = quickpool::sched::TaskManager{ threads };
850  for (size_t id = 0; id < threads; ++id) {
851  add_worker(id);
852  }
853 #if (defined __linux__)
854  set_thread_affinity();
855 #endif
856  }
857  active_threads_ = threads;
858  }
859 
861  size_t get_active_threads() const { return active_threads_; }
862 
866  template<class Function, class... Args>
867  void push(Function&& f, Args&&... args)
868  {
869  if (active_threads_ == 0)
870  return f(args...);
871  task_manager_.push(
872  std::bind(std::forward<Function>(f), std::forward<Args>(args)...));
873  }
874 
880  template<class Function, class... Args>
881  auto async(Function&& f, Args&&... args)
882  -> std::future<decltype(f(args...))>
883  {
884  auto pack =
885  std::bind(std::forward<Function>(f), std::forward<Args>(args)...);
886  using pack_t = std::packaged_task<decltype(f(args...))()>;
887  auto task_ptr = std::make_shared<pack_t>(std::move(pack));
888  this->push([task_ptr] { (*task_ptr)(); });
889  return task_ptr->get_future();
890  }
891 
901  template<class UnaryFunction>
902  void parallel_for(int begin, int end, UnaryFunction f)
903  {
904  // each worker has its dedicated range, but can steal part of
905  // another worker's ranges when done with own
906  auto n = std::max(this->get_active_threads(), static_cast<size_t>(1));
907  auto workers = loop::create_workers<UnaryFunction>(f, begin, end, n);
908  for (int k = 0; k < n; k++) {
909  this->push([=] { workers->at(k).run(workers); });
910  }
911  this->wait();
912  }
913 
923  template<class Items, class UnaryFunction>
924  inline void parallel_for_each(Items& items, UnaryFunction f)
925  {
926  auto begin = std::begin(items);
927  auto size = std::distance(begin, std::end(items));
928  this->parallel_for(0, size, [=](int i) { f(begin[i]); });
929  }
930 
935  void wait(size_t millis = 0) { task_manager_.wait_for_finish(millis); }
936 
938  bool done() const { return task_manager_.done(); }
939 
941  static void* operator new(size_t count)
942  {
943  return mem::aligned::alloc(alignof(ThreadPool), count);
944  }
945 
947  static void operator delete(void* ptr) { mem::aligned::free(ptr); }
948 
949  private:
951  void join_threads()
952  {
953  for (auto& worker : workers_) {
954  if (worker.joinable())
955  worker.join();
956  }
957  }
958 
961  void add_worker(size_t id)
962  {
963  workers_[id] = std::thread([&, id] {
964  std::function<void()> task;
965  while (!task_manager_.stopped()) {
966  task_manager_.wait_for_jobs(id);
967  do {
968  // inner while to save some time calling done()
969  while (task_manager_.try_pop(task, id))
970  this->execute_safely(task);
971  } while (!task_manager_.done());
972  }
973  });
974  }
975 
976 #if (defined __linux__)
978  void set_thread_affinity()
979  {
980  cpu_set_t cpuset;
981  auto avail_cores = sched::get_avail_cores();
982  for (size_t id = 0; id < workers_.size(); id++) {
983  CPU_ZERO(&cpuset);
984  CPU_SET(avail_cores[id % avail_cores.size()], &cpuset);
985  int rc = pthread_setaffinity_np(
986  workers_.at(id).native_handle(), sizeof(cpu_set_t), &cpuset);
987  if (rc != 0) {
988  throw std::runtime_error(
989  "Error calling pthread_setaffinity_np");
990  }
991  }
992  }
993 #endif
994 
995  void execute_safely(std::function<void()>& task)
996  {
997  try {
998  task();
999  task_manager_.report_success();
1000  } catch (...) {
1001  task_manager_.report_fail(std::current_exception());
1002  }
1003  }
1004 
1005  sched::TaskManager task_manager_;
1006  std::vector<std::thread> workers_;
1007  std::atomic_size_t active_threads_;
1008 };
1009 
1010 // 5. ---------------------------------------------------
1011 
1013 
1017 template<class Function, class... Args>
1018 inline void
1019 push(Function&& f, Args&&... args)
1020 {
1021  ThreadPool::global_instance().push(std::forward<Function>(f),
1022  std::forward<Args>(args)...);
1023 }
1024 
1030 template<class Function, class... Args>
1031 inline auto
1032 async(Function&& f, Args&&... args) -> std::future<decltype(f(args...))>
1033 {
1034  return ThreadPool::global_instance().async(std::forward<Function>(f),
1035  std::forward<Args>(args)...);
1036 }
1037 
1040 inline void
1042 {
1044 }
1045 
1047 inline bool
1049 {
1050  return ThreadPool::global_instance().done();
1051 }
1052 
1056 inline void
1057 set_active_threads(size_t threads)
1058 {
1060 }
1061 
1064 inline size_t
1066 {
1068 }
1069 
1079 template<class UnaryFunction>
1080 inline void
1081 parallel_for(int begin, int end, UnaryFunction&& f)
1082 {
1084  begin, end, std::forward<UnaryFunction>(f));
1085 }
1086 
1096 template<class Items, class UnaryFunction>
1097 inline void
1098 parallel_for_each(Items& items, UnaryFunction&& f)
1099 {
1101  items, std::forward<UnaryFunction>(f));
1102 }
1103 
1104 } // end namespace quickpool
A work stealing thread pool.
Definition: quickpool.hpp:797
void wait(size_t millis=0)
waits for all jobs currently running on the thread pool. Has no effect when called from threads other...
Definition: quickpool.hpp:935
void push(Function &&f, Args &&... args)
pushes a job to the thread pool.
Definition: quickpool.hpp:867
size_t get_active_threads() const
retrieves the number of active worker threads in the thread pool.
Definition: quickpool.hpp:861
void set_active_threads(size_t threads)
sets the number of active worker threads in the thread pool.
Definition: quickpool.hpp:836
static ThreadPool & global_instance()
returns a reference to the global thread pool instance.
Definition: quickpool.hpp:820
bool done() const
checks whether all jobs are done.
Definition: quickpool.hpp:938
void parallel_for(int begin, int end, UnaryFunction f)
computes an index-based parallel for loop.
Definition: quickpool.hpp:902
ThreadPool(size_t threads=sched::num_cores_avail())
constructs a thread pool.
Definition: quickpool.hpp:802
void parallel_for_each(Items &items, UnaryFunction f)
computes a iterator-based parallel for loop.
Definition: quickpool.hpp:924
auto async(Function &&f, Args &&... args) -> std::future< decltype(f(args...))>
executes a job asynchronously on the global thread pool.
Definition: quickpool.hpp:881
quickpool namespace
Definition: quickpool.hpp:57
void push(Function &&f, Args &&... args)
Free-standing functions (main API)
Definition: quickpool.hpp:1019
void wait()
waits for all jobs currently running on the global thread pool. Has no effect when not called from ma...
Definition: quickpool.hpp:1041
void parallel_for(int begin, int end, UnaryFunction &&f)
computes an index-based parallel for loop.
Definition: quickpool.hpp:1081
void parallel_for_each(Items &items, UnaryFunction &&f)
computes a iterator-based parallel for loop.
Definition: quickpool.hpp:1098
void set_active_threads(size_t threads)
sets the number of active worker threads in the global thread pool.
Definition: quickpool.hpp:1057
size_t get_active_threads()
retrieves the number of active worker threads in the global thread pool.
Definition: quickpool.hpp:1065
bool done()
checks whether all globel jobs are done.
Definition: quickpool.hpp:1048
auto async(Function &&f, Args &&... args) -> std::future< decltype(f(args...))>
executes a job asynchronously the global thread pool.
Definition: quickpool.hpp:1032