quickpool  1.8.0
An easy-to-use, header-only work stealing thread pool in C++11
quickpool.hpp
1 // Copyright 2026 Thomas Nagler (MIT License)
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a copy
4 // of this software and associated documentation files (the "Software"), to deal
5 // in the Software without restriction, including without limitation the rights
6 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7 // copies of the Software, and to permit persons to whom the Software is
8 // furnished to do so, subject to the following conditions:
9 
10 // The above copyright notice and this permission notice shall be included in
11 // all copies or substantial portions of the Software.
12 
13 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19 // SOFTWARE.
20 
21 #pragma once
22 
23 #include <algorithm>
24 #include <atomic>
25 #include <condition_variable>
26 #include <exception>
27 #include <functional>
28 #include <future>
29 #include <iterator>
30 #include <limits>
31 #include <memory>
32 #include <mutex>
33 #include <numeric>
34 #include <stdexcept>
35 #include <thread>
36 #include <tuple>
37 #include <type_traits>
38 #include <utility>
39 #include <vector>
40 
41 #if (defined __linux__ || defined AFFINITY)
42 #include <pthread.h>
43 #endif
44 
45 #if __cplusplus >= 201703L || (defined(_MSVC_LANG) && _MSVC_LANG >= 201703L)
46 #define QUICKPOOL_HAS_CPP17 1
47 #else
48 #define QUICKPOOL_HAS_CPP17 0
49 #endif
50 
51 // Layout of quickpool.hpp
52 //
53 // 1. Memory related utilities.
54 // - Memory order aliases
55 // - Class for padding bytes
56 // - Memory aligned allocation utilities
57 // - Class for cache aligned atomics
58 // - Class for load/assign atomics with relaxed order
59 // 2. Loop related utilities.
60 // - Worker class for parallel for loops
61 // 3. Scheduling utilities.
62 // - Ring buffer
63 // - Task queue
64 // - Task manager
65 // 4. Thread pool class
66 // 5. Free-standing functions (main API)
67 
69 namespace quickpool {
70 
71 namespace detail {
72 
73 template<class Function, class... Args>
74 struct Task
75 {
76 #if QUICKPOOL_HAS_CPP17
77  using type = std::invoke_result_t<Function, Args...>;
78 
79  static auto make(Function&& f, Args&&... args)
80  {
81  return [f = std::forward<Function>(f),
82  args = std::make_tuple(std::forward<Args>(args)...)]() mutable {
83  return std::apply(f, args);
84  };
85  }
86 #else
87  using type = decltype(std::declval<Function>()(std::declval<Args>()...));
88 
89  static auto make(Function&& f, Args&&... args)
90  -> decltype(std::bind(std::forward<Function>(f),
91  std::forward<Args>(args)...))
92  {
93  return std::bind(std::forward<Function>(f),
94  std::forward<Args>(args)...);
95  }
96 #endif
97 };
98 
99 } // namespace detail
100 
101 // 1. --------------------------------------------------------------------------
102 
104 namespace mem {
105 
107 static constexpr std::memory_order relaxed = std::memory_order_relaxed;
108 static constexpr std::memory_order acquire = std::memory_order_acquire;
109 static constexpr std::memory_order release = std::memory_order_release;
110 static constexpr std::memory_order seq_cst = std::memory_order_seq_cst;
111 
116 namespace padding_impl {
117 
119 constexpr size_t
120 mod(size_t a, size_t b)
121 {
122  return a - b * (a / b);
123 }
124 
125 // Padding bytes from end of aligned object until next alignment point. char[]
126 // must hold at least one byte.
127 template<class T, size_t Align>
128 struct padding_bytes
129 {
130  static constexpr size_t free_space =
131  Align - mod(sizeof(std::atomic<T>), Align);
132  static constexpr size_t required = free_space > 1 ? free_space : 1;
133  char padding_[required];
134 };
135 
136 struct empty_struct
137 {};
138 
141 template<class T, size_t Align>
142 struct padding
143  : std::conditional<mod(sizeof(std::atomic<T>), Align) != 0,
144  padding_bytes<T, Align>,
145  empty_struct>::type
146 {};
147 
148 } // end namespace padding_impl
149 
150 namespace aligned {
151 
152 // The alloc/dealloc mechanism is pretty much
153 // https://www.boost.org/doc/libs/1_76_0/boost/align/detail/aligned_alloc.hpp
154 
155 inline void*
156 alloc(size_t alignment, size_t size) noexcept
157 {
158  // Make sure alignment is at least that of void*.
159  alignment = (alignment >= alignof(void*)) ? alignment : alignof(void*);
160 
161  // Allocate enough space required for object and a void*.
162  size_t space = size + alignment + sizeof(void*);
163  void* p = std::malloc(space);
164  if (p == nullptr) {
165  return nullptr;
166  }
167 
168  // Shift pointer to leave space for void*.
169  void* p_algn = static_cast<char*>(p) + sizeof(void*);
170  space -= sizeof(void*);
171 
172  // Shift pointer further to ensure proper alignment.
173  (void)std::align(alignment, size, p_algn, space);
174 
175  // Store unaligned pointer with offset sizeof(void*) before aligned
176  // location. Later we'll know where to look for the pointer telling
177  // us where to free what we malloc()'ed above.
178  *(static_cast<void**>(p_algn) - 1) = p;
179 
180  return p_algn;
181 }
182 
183 inline void
184 free(void* ptr) noexcept
185 {
186  if (ptr) {
187  std::free(*(static_cast<void**>(ptr) - 1));
188  }
189 }
190 
193 template<class T, std::size_t Alignment = 64>
194 class allocator : public std::allocator<T>
195 {
196  private:
197  static constexpr size_t min_align =
198  (Alignment >= alignof(void*)) ? Alignment : alignof(void*);
199 
200  public:
201  template<class U>
202  struct rebind
203  {
204  typedef allocator<U, Alignment> other;
205  };
206 
207  allocator() noexcept
208  : std::allocator<T>()
209  {}
210 
211  template<class U, std::size_t UAlignment>
212  allocator(const allocator<U, UAlignment>& other) noexcept
213  : std::allocator<T>(other)
214  {}
215 
216  T* allocate(size_t size, const void* = 0)
217  {
218  if (size == 0) {
219  return 0;
220  }
221  void* p = mem::aligned::alloc(min_align, sizeof(T) * size);
222  if (!p) {
223  throw std::bad_alloc();
224  }
225  return static_cast<T*>(p);
226  }
227 
228 #if defined(__cpp_lib_allocate_at_least)
229  std::allocation_result<T*> allocate_at_least(std::size_t size)
230  {
231  return { allocate(size), size };
232  }
233 #endif
234 
235  void deallocate(T* ptr, size_t) { mem::aligned::free(ptr); }
236 
237  template<class U, class... Args>
238  void construct(U* ptr, Args&&... args)
239  {
240  ::new (static_cast<void *>(ptr)) U(std::forward<Args>(args)...);
241  }
242 
243  template <class U>
244  void destroy(U *ptr) noexcept
245  {
246  (void)ptr;
247  ptr->~U();
248  }
249 };
250 
254 template<class T, size_t Align = 64>
255 struct alignas(Align) atomic
256  : public std::atomic<T>
257  , private padding_impl::padding<T, Align>
258 {
259  public:
260  atomic() noexcept = default;
261 
262  atomic(T desired) noexcept
263  : std::atomic<T>(desired)
264  {}
265 
266  // Assignment operators have been deleted, must redefine.
267  T operator=(T x) noexcept { return std::atomic<T>::operator=(x); }
268  T operator=(T x) volatile noexcept { return std::atomic<T>::operator=(x); }
269 
270  static void* operator new(size_t count) noexcept
271  {
272  return mem::aligned::alloc(Align, count);
273  }
274 
275  static void operator delete(void* ptr) { mem::aligned::free(ptr); }
276 };
277 
279 template<typename T>
280 struct relaxed_atomic : public mem::aligned::atomic<T>
281 {
282  explicit relaxed_atomic(T value)
283  : mem::aligned::atomic<T>(value)
284  {}
285 
286  operator T() const noexcept { return this->load(mem::relaxed); }
287 
288  T operator=(T desired) noexcept
289  {
290  this->store(desired, mem::relaxed);
291  return desired;
292  }
293 };
294 
296 template<class T, size_t Alignment = 64>
297 using vector = std::vector<T, mem::aligned::allocator<T, Alignment>>;
298 
299 } // end namespace aligned
300 
301 } // end namespace mem
302 
303 // 2. --------------------------------------------------------------------------
304 
306 namespace loop {
307 
308 template<class Iterator, class UnaryFunction, class Pool>
309 void
310 parallel_for_each_impl(Iterator begin,
311  Iterator end,
312  int size,
313  UnaryFunction f,
314  Pool& pool,
315  std::random_access_iterator_tag)
316 {
317  (void)end;
318  pool.parallel_for(0, static_cast<int>(size), [=](int i) { f(begin[i]); });
319 }
320 
321 template<class Iterator, class UnaryFunction, class Pool, class IteratorCategory>
322 void
323 parallel_for_each_impl(Iterator begin,
324  Iterator end,
325  int size,
326  UnaryFunction f,
327  Pool& pool,
328  IteratorCategory)
329 {
330  auto iterators = std::make_shared<std::vector<Iterator>>();
331  iterators->reserve(static_cast<size_t>(size));
332  for (auto it = begin; it != end; ++it) {
333  iterators->push_back(it);
334  }
335  pool.parallel_for(0, static_cast<int>(iterators->size()), [=](int i) {
336  f(*iterators->at(static_cast<size_t>(i)));
337  });
338 }
339 
341 struct State
342 {
343  int pos;
344  int end;
345 };
346 
357 template<typename Function>
358 struct Worker
359 {
360  Worker() {}
361  Worker(int begin, int end, Function fun)
362  : state{ State{ begin, end } }
363  , f{ fun }
364  {}
365 
366  Worker(Worker&& other)
367  : state{ other.state.load() }
368  , f{ std::forward<Function>(other.f) }
369  {}
370 
371  size_t tasks_left() const
372  {
373  State s = state.load();
374  return (s.end > s.pos) ? static_cast<size_t>(s.end - s.pos)
375  : static_cast<size_t>(0);
376  }
377 
378  bool done() const { return (tasks_left() == 0); }
379 
381  void run(std::shared_ptr<mem::aligned::vector<Worker>> others)
382  {
383  State s, s_old; // temporary state variables
384  do {
385  s = state.load();
386  if (s.pos < s.end) {
387  // Protect slot by trying to advance position before doing
388  // work.
389  s_old = s;
390  s.pos++;
391 
392  // Another worker might have changed the end of the range in
393  // the meanwhile. Check atomically if the state is unaltered
394  // and, if so, replace by advanced state.
395  if (state.compare_exchange_weak(s_old, s)) {
396  f(s_old.pos); // succeeded, do work
397  } else {
398  continue; // failed, try again
399  }
400  }
401  if (s.pos == s.end) {
402  // Reached end of own range, steal range from others. Range
403  // remains empty if all work is done, so we can leave the
404  // loop.
405  this->steal_range(*others);
406  }
407  } while (!this->done());
408  }
409 
411  void steal_range(mem::aligned::vector<Worker>& workers)
412  {
413  do {
414  Worker& other = find_victim(workers);
415  State s = other.state.load();
416  if (s.pos >= s.end) {
417  continue; // other range is empty by now
418  }
419 
420  // Remove second half of the range. Check atomically if the
421  // state is unaltered and, if so, replace with reduced range.
422  auto s_old = s;
423  s.end -= (s.end - s.pos + 1) / 2;
424  if (other.state.compare_exchange_weak(s_old, s)) {
425  // succeeded, update own range
426  state = State{ s.end, s_old.end };
427  break;
428  }
429  } while (!all_done(workers)); // failed steal, try again
430  }
431 
433  bool all_done(const mem::aligned::vector<Worker>& workers)
434  {
435  for (const auto& worker : workers) {
436  if (!worker.done())
437  return false;
438  }
439  return true;
440  }
441 
445  Worker& find_victim(mem::aligned::vector<Worker>& workers)
446  {
447  size_t best = 0;
448  size_t most_tasks_left = 0;
449  for (size_t i = 0; i < workers.size(); ++i) {
450  const auto tasks_left = workers[i].tasks_left();
451  if (tasks_left > most_tasks_left) {
452  best = i;
453  most_tasks_left = tasks_left;
454  }
455  }
456  return workers[best];
457  }
458 
459  mem::aligned::relaxed_atomic<State> state;
460  Function f; //< function applied to the loop index
461 };
462 
466 template<typename Function>
467 std::shared_ptr<mem::aligned::vector<Worker<Function>>>
468 create_workers(const Function& f, int begin, int end, size_t num_workers)
469 {
470  auto num_tasks = std::max(end - begin, static_cast<int>(0));
471  num_workers = std::max(num_workers, static_cast<size_t>(1));
472  auto workers = std::make_shared<mem::aligned::vector<Worker<Function>>>();
473  workers->reserve(num_workers);
474  for (size_t i = 0; i < num_workers; i++) {
475  const auto first =
476  begin + static_cast<int>(static_cast<size_t>(num_tasks) * i /
477  num_workers);
478  const auto last =
479  begin + static_cast<int>(static_cast<size_t>(num_tasks) * (i + 1) /
480  num_workers);
481  workers->emplace_back(first, last, f);
482  }
483  return workers;
484 }
485 
486 } // end namespace loop
487 
488 // 3. -------------------------------------------------------------------------
489 
491 namespace sched {
492 
494 template<typename T>
495 class RingBuffer
496 {
497  public:
498  explicit RingBuffer(size_t capacity)
499  : buffer_{
500  std::unique_ptr<std::atomic<T>[]>(new std::atomic<T>[capacity])
501  }
502  , capacity_{ capacity }
503  , mask_{ capacity - 1 }
504  {}
505 
506  size_t capacity() const { return capacity_; }
507 
508  void set_entry(size_t i, T val)
509  {
510  buffer_[i & mask_].store(val, mem::relaxed);
511  }
512 
513  T get_entry(size_t i) const
514  {
515  return buffer_[i & mask_].load(mem::relaxed);
516  }
517 
518  RingBuffer<T>* enlarged_copy(size_t bottom, size_t top) const
519  {
520  RingBuffer<T>* new_buffer = new RingBuffer{ 2 * capacity_ };
521  for (size_t i = top; i != bottom; ++i)
522  new_buffer->set_entry(i, this->get_entry(i));
523  return new_buffer;
524  }
525 
526  private:
527  std::unique_ptr<std::atomic<T>[]> buffer_;
528  size_t capacity_;
529  size_t mask_;
530 };
531 
533 class TaskQueue
534 {
535  using Task = std::function<void()>;
536 
537  struct TaskNode
538  {
539  Task task;
540  TaskNode* next{ nullptr };
541  };
542 
543  public:
545  TaskQueue(size_t capacity = 256)
546  : buffer_{ new RingBuffer<TaskNode*>(capacity) }
547  {}
548 
549  ~TaskQueue() noexcept
550  {
551  delete buffer_.load();
552  }
553 
554  TaskQueue(TaskQueue const& other) = delete;
555  TaskQueue& operator=(TaskQueue const& other) = delete;
556 
558  bool empty() const
559  {
560  return (bottom_.load(mem::relaxed) <= top_.load(mem::relaxed));
561  }
562 
565  void push(Task&& task)
566  {
567  // Must hold lock in case of multiple producers.
568  std::unique_lock<std::mutex> lk(mutex_);
569  auto b = bottom_.load(mem::relaxed);
570  auto t = top_.load(mem::acquire);
571  RingBuffer<TaskNode*>* buf_ptr = buffer_.load(mem::relaxed);
572 
573  const auto size = b - t;
574  if (buf_ptr->capacity() < size + 1) {
575  // Buffer is full, create enlarged copy before continuing.
576  auto old_buf = buf_ptr;
577  buf_ptr = std::move(buf_ptr->enlarged_copy(b, t));
578  old_buffers_.emplace_back(old_buf);
579  buffer_.store(buf_ptr, mem::release);
580  }
581 
582  auto node = acquire_node();
583  try {
584  node->task = std::forward<Task>(task);
585  } catch (...) {
586  recycle_node(node);
587  throw;
588  }
590  buf_ptr->set_entry(b, node);
591  bottom_.store(b + 1, mem::release);
592 
593  lk.unlock(); // can release before signal
594  cv_.notify_one();
595  }
596 
598  bool try_pop(Task& task)
599  {
600  auto t = top_.load(mem::acquire);
601  std::atomic_thread_fence(mem::seq_cst);
602  auto b = bottom_.load(mem::acquire);
603 
604  if (t < b) {
605  // Must load task pointer before acquiring the slot, because it
606  // could be overwritten immediately after.
607  auto node = buffer_.load(mem::acquire)->get_entry(t);
608 
609  // Atomically try to advance top.
610  if (top_.compare_exchange_strong(
611  t, t + 1, mem::seq_cst, mem::relaxed)) {
612  task = std::move(node->task); // won race, get task
613  recycle_node(node);
614  return true;
615  }
616  }
617  return false; // queue is empty or lost race
618  }
619 
621  void wait()
622  {
623  std::unique_lock<std::mutex> lk(mutex_);
624  cv_.wait(lk, [this] { return !this->empty() || stopped_; });
625  }
626 
628  void stop()
629  {
630  {
631  std::lock_guard<std::mutex> lk(mutex_);
632  stopped_ = true;
633  }
634  cv_.notify_one();
635  }
636 
637  void wake_up()
638  {
639  {
640  std::lock_guard<std::mutex> lk(mutex_);
641  }
642  cv_.notify_one();
643  }
644 
645  private:
647  mem::aligned::atomic<size_t> top_{ 0 };
648  mem::aligned::atomic<size_t> bottom_{ 0 };
649 
651  std::atomic<RingBuffer<TaskNode*>*> buffer_{ nullptr };
652 
654  std::vector<std::unique_ptr<RingBuffer<TaskNode*>>> old_buffers_;
655 
657  std::vector<std::unique_ptr<TaskNode>> allocated_nodes_;
658 
660  std::atomic<TaskNode*> free_nodes_{ nullptr };
661 
662  // Only push() calls acquire_node(), and push() holds mutex_, while many
663  // worker threads may recycle nodes concurrently.
664  TaskNode* acquire_node()
665  {
666  auto node = free_nodes_.load(mem::acquire);
667  while (node != nullptr) {
668  auto next = node->next;
669  if (free_nodes_.compare_exchange_weak(
670  node, next, mem::acquire, mem::acquire)) {
671  node->next = nullptr;
672  return node;
673  }
674  }
675 
676  allocated_nodes_.emplace_back(new TaskNode);
677  return allocated_nodes_.back().get();
678  }
679 
680  void recycle_node(TaskNode* node) noexcept
681  {
682  node->task = nullptr;
683  auto head = free_nodes_.load(mem::relaxed);
684  do {
685  node->next = head;
686  } while (!free_nodes_.compare_exchange_weak(
687  head, node, mem::release, mem::relaxed));
688  }
689 
691  std::mutex mutex_;
692  std::condition_variable cv_;
693  bool stopped_{ false };
694 };
695 
697 class TaskManager
698 {
699  public:
700  explicit TaskManager(size_t num_queues)
701  : queues_(std::max(num_queues, static_cast<size_t>(1)))
702  , num_queues_(std::max(num_queues, static_cast<size_t>(1)))
703  , owner_id_(std::this_thread::get_id())
704  {}
705 
706  TaskManager& operator=(TaskManager&& other)
707  {
708  std::swap(queues_, other.queues_);
709  num_queues_ = other.num_queues_;
710  status_ = other.status_.load();
711  num_waiting_ = other.num_waiting_.load();
712  push_idx_ = other.push_idx_.load();
713  todo_ = other.todo_.load();
714  return *this;
715  }
716 
717  void resize(size_t num_queues)
718  {
719  if (!done()) {
720  throw std::logic_error("cannot resize with pending tasks");
721  }
722  num_queues_ = std::max(num_queues, static_cast<size_t>(1));
723  if (num_queues_ > queues_.size()) {
724  queues_ = mem::aligned::vector<TaskQueue>(num_queues_);
725  // thread pool must have stopped the manager, reset
726  num_waiting_ = 0;
727  todo_ = 0;
728  status_ = Status::running;
729  }
730  }
731 
732  template<typename Task>
733  void push(Task&& task)
734  {
735  rethrow_exception(); // push() throws if a task has errored.
736  if (is_running()) {
737  todo_.fetch_add(1, mem::release);
738  try {
739  queues_[push_idx_++ % num_queues_].push(task);
740  } catch (...) {
741  report_success();
742  throw;
743  }
744  }
745  }
746 
747  template<typename Task>
748  bool try_pop(Task& task, size_t worker_id = 0)
749  {
750  // Always start pop cycle at own queue to avoid contention.
751  for (size_t k = 0; k < num_queues_; k++) {
752  if (queues_[(worker_id + k) % num_queues_].try_pop(task)) {
753  if (is_running()) {
754  return true;
755  } else {
756  // Throw away task if pool has stopped or errored.
757  return false;
758  }
759  }
760  }
761 
762  return false;
763  }
764 
765  void wake_up_all_workers()
766  {
767  for (auto& q : queues_)
768  q.wake_up();
769  }
770 
771  void wait_for_jobs(size_t id)
772  {
773  if (has_errored()) {
774  // Main thread may be waiting to reset the pool.
775  std::lock_guard<std::mutex> lk(mtx_);
776  if (++num_waiting_ == queues_.size())
777  cv_.notify_all();
778  } else {
779  ++num_waiting_;
780  }
781 
782  queues_[id].wait();
783  --num_waiting_;
784  }
785 
787  void wait_for_finish(size_t millis = 0)
788  {
789  if (called_from_owner_thread() && is_running()) {
790  auto wake_up = [this] { return (todo_ <= 0) || !is_running(); };
791  std::unique_lock<std::mutex> lk(mtx_);
792  if (millis == 0) {
793  cv_.wait(lk, wake_up);
794  } else {
795  cv_.wait_for(lk, std::chrono::milliseconds(millis), wake_up);
796  }
797  }
798  rethrow_exception();
799  }
800 
801  bool called_from_owner_thread() const
802  {
803  return (std::this_thread::get_id() == owner_id_);
804  }
805 
806  void report_success()
807  {
808 
809  auto n = todo_.fetch_sub(1, mem::release) - 1;
810  if (n == 0) {
811  // all jobs are done; lock before signal to prevent spurious
812  // failure
813  {
814  std::lock_guard<std::mutex> lk{ mtx_ };
815  }
816  cv_.notify_all();
817  }
818  }
819 
820  void report_fail(std::exception_ptr err_ptr)
821  {
822  std::lock_guard<std::mutex> lk(mtx_);
823  if (has_errored()) // only catch first exception
824  return;
825  err_ptr_ = err_ptr;
826  status_ = Status::errored;
827 
828  // Some threads may change todo_ after we stop. The large
829  // negative number forces them to exit the processing loop.
830  todo_.store(std::numeric_limits<int>::min() / 2);
831  cv_.notify_all();
832  }
833 
834  void stop()
835  {
836  {
837  std::lock_guard<std::mutex> lk(mtx_);
838  status_ = Status::stopped;
839  }
840  // Worker threads wait on queue-specific mutex -> notify all queues.
841  for (auto& q : queues_)
842  q.stop();
843  }
844 
845  void rethrow_exception()
846  {
847  // Exceptions are only thrown from the owner thread, not in workers.
848  if (called_from_owner_thread() && has_errored()) {
849  {
850  // Wait for all threads to idle so we can clean up after
851  // them.
852  std::unique_lock<std::mutex> lk(mtx_);
853  cv_.wait(lk, [this] { return num_waiting_ == queues_.size(); });
854  }
855  // Before throwing: restore defaults for potential future use of
856  // the task manager.
857  todo_ = 0;
858  auto current_exception = err_ptr_;
859  err_ptr_ = nullptr;
860  status_ = Status::running;
861 
862  std::rethrow_exception(current_exception);
863  }
864  }
865 
866  bool is_running() const
867  {
868  return status_.load(mem::relaxed) == Status::running;
869  }
870 
871  bool has_errored() const
872  {
873  return status_.load(mem::relaxed) == Status::errored;
874  }
875 
876  bool stopped() const
877  {
878  return status_.load(mem::relaxed) == Status::stopped;
879  }
880 
881  bool done() const { return (todo_.load(mem::relaxed) <= 0); }
882 
883  private:
885  mem::aligned::vector<TaskQueue> queues_;
886  size_t num_queues_;
887 
889  mem::aligned::relaxed_atomic<size_t> num_waiting_{ 0 };
890  mem::aligned::relaxed_atomic<size_t> push_idx_{ 0 };
891  mem::aligned::atomic<int> todo_{ 0 };
892 
894  const std::thread::id owner_id_;
895  enum class Status
896  {
897  running,
898  errored,
899  stopped
900  };
901  mem::aligned::atomic<Status> status_{ Status::running };
902  std::mutex mtx_;
903  std::condition_variable cv_;
904  std::exception_ptr err_ptr_{ nullptr };
905 };
906 
907 // find out which cores are allowed for use by pthread
908 inline std::vector<size_t>
909 get_avail_cores()
910 {
911  auto ncores = std::thread::hardware_concurrency();
912  std::vector<size_t> avail_cores;
913  avail_cores.reserve(ncores);
914 #if (defined __linux__)
915  cpu_set_t cpuset;
916  int rc = pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
917  if (rc != 0) {
918  throw std::runtime_error("Error calling pthread_getaffinity_np");
919  }
920  for (size_t id = 0; id < ncores; id++) {
921  if (CPU_ISSET(id, &cpuset)) {
922  avail_cores.push_back(id);
923  }
924  }
925 #endif
926  return avail_cores;
927 }
928 
929 inline size_t
930 num_cores_avail()
931 {
932 #if (defined __linux__)
933  return get_avail_cores().size();
934 #endif
935  return std::thread::hardware_concurrency();
936 }
937 
938 } // end namespace sched
939 
940 // 4. ------------------------------------------------------------------------
941 
944 {
945  public:
949  explicit ThreadPool(size_t threads = sched::num_cores_avail())
950  : task_manager_{ threads }
951  {
952  set_active_threads(threads);
953  }
954 
955  ~ThreadPool()
956  {
957  task_manager_.stop();
958  join_threads();
959  }
960 
961  ThreadPool(ThreadPool&&) = delete;
962  ThreadPool(const ThreadPool&) = delete;
963  ThreadPool& operator=(const ThreadPool&) = delete;
964  ThreadPool& operator=(ThreadPool&& other) = delete;
965 
968  {
969 #ifdef _WIN32
970  // Must leak resource, because windows + R deadlock otherwise.
971  // Memory is released on shutdown.
972  static auto ptr = new ThreadPool;
973  return *ptr;
974 #else
975  static ThreadPool instance_;
976  return instance_;
977 #endif
978  }
979 
983  void set_active_threads(size_t threads)
984  {
985  if (!task_manager_.called_from_owner_thread())
986  return;
987 
988  if (threads == active_threads_.load(mem::relaxed)) {
989  return;
990  }
991 
992  this->wait();
993  if (workers_.size() > 0) {
994  task_manager_.stop();
995  join_threads();
996  workers_.clear();
997  }
998 
999  task_manager_ = quickpool::sched::TaskManager{ threads };
1000  workers_ = std::vector<std::thread>{ threads };
1001  for (size_t id = 0; id < threads; ++id) {
1002  add_worker(id);
1003  }
1004 #if (defined __linux__)
1005  if (threads > 0) {
1006  set_thread_affinity();
1007  }
1008 #endif
1009  active_threads_ = threads;
1010  }
1011 
1013  size_t get_active_threads() const { return active_threads_; }
1014 
1018  template<class Function, class... Args>
1019  void push(Function&& f, Args&&... args)
1020  {
1021  if (active_threads_ == 0) {
1022  std::forward<Function>(f)(std::forward<Args>(args)...);
1023  return;
1024  }
1025  task_manager_.push(detail::Task<Function, Args...>::make(
1026  std::forward<Function>(f), std::forward<Args>(args)...));
1027  }
1028 
1034  template<class Function, class... Args>
1035  auto async(Function&& f, Args&&... args)
1036  -> std::future<typename detail::Task<Function, Args...>::type>
1037  {
1039  std::forward<Function>(f), std::forward<Args>(args)...);
1040  using result_t = typename detail::Task<Function, Args...>::type;
1041  using pack_t = std::packaged_task<result_t()>;
1042  auto task_ptr = std::make_shared<pack_t>(std::move(pack));
1043  this->push([task_ptr] { (*task_ptr)(); });
1044  return task_ptr->get_future();
1045  }
1046 
1056  template<class UnaryFunction>
1057  void parallel_for(int begin, int end, UnaryFunction f)
1058  {
1059  if (end <= begin) {
1060  return;
1061  }
1062  const auto active_threads = active_threads_.load(mem::relaxed);
1063  if (active_threads == 0) {
1064  for (auto i = begin; i < end; ++i) {
1065  f(i);
1066  }
1067  return;
1068  }
1069 
1070  // each worker has its dedicated range, but can steal part of
1071  // another worker's ranges when done with own
1072  const auto num_tasks = static_cast<size_t>(end - begin);
1073  const auto n =
1074  std::min(std::max(active_threads, static_cast<size_t>(1)), num_tasks);
1075  auto workers = loop::create_workers<UnaryFunction>(f, begin, end, n);
1076  for (size_t k = 0; k < n; k++) {
1077  this->push([=] { workers->at(k).run(workers); });
1078  }
1079  this->wait();
1080  }
1081 
1091  template<class Items, class UnaryFunction>
1092  inline void parallel_for_each(Items& items, UnaryFunction f)
1093  {
1094  auto begin = std::begin(items);
1095  auto size = std::distance(begin, std::end(items));
1096  if (size <= 0) {
1097  return;
1098  }
1099  if (size > std::numeric_limits<int>::max()) {
1100  throw std::length_error("parallel_for_each range is too large");
1101  }
1102  typedef typename std::iterator_traits<decltype(begin)>::iterator_category
1103  iterator_category;
1104  loop::parallel_for_each_impl(begin,
1105  std::end(items),
1106  static_cast<int>(size),
1107  f,
1108  *this,
1109  iterator_category{});
1110  }
1111 
1116  void wait(size_t millis = 0) { task_manager_.wait_for_finish(millis); }
1117 
1119  // state, and rethrows an exception if one is pending.
1121  {
1122  task_manager_.report_fail(std::current_exception());
1123  task_manager_.wait_for_finish();
1124  task_manager_.rethrow_exception();
1125  }
1126 
1128  bool done() const { return task_manager_.done(); }
1129 
1131  static void* operator new(size_t count)
1132  {
1133  return mem::aligned::alloc(alignof(ThreadPool), count);
1134  }
1135 
1137  static void operator delete(void* ptr) { mem::aligned::free(ptr); }
1138 
1139  private:
1141  void join_threads()
1142  {
1143  for (auto& worker : workers_) {
1144  if (worker.joinable())
1145  worker.join();
1146  }
1147  }
1148 
1151  void add_worker(size_t id)
1152  {
1153  workers_[id] = std::thread([&, id] {
1154  std::function<void()> task;
1155  while (!task_manager_.stopped()) {
1156  task_manager_.wait_for_jobs(id);
1157  do {
1158  // inner while to save some time calling done()
1159  while (task_manager_.try_pop(task, id))
1160  this->execute_safely(task);
1161  } while (!task_manager_.done());
1162  }
1163  });
1164  }
1165 
1166 #if (defined __linux__)
1168  void set_thread_affinity()
1169  {
1170  cpu_set_t cpuset;
1171  auto avail_cores = sched::get_avail_cores();
1172  for (size_t id = 0; id < workers_.size(); id++) {
1173  CPU_ZERO(&cpuset);
1174  CPU_SET(avail_cores[id % avail_cores.size()], &cpuset);
1175  int rc = pthread_setaffinity_np(
1176  workers_.at(id).native_handle(), sizeof(cpu_set_t), &cpuset);
1177  if (rc != 0) {
1178  throw std::runtime_error(
1179  "Error calling pthread_setaffinity_np");
1180  }
1181  }
1182  }
1183 #endif
1184 
1185  void execute_safely(std::function<void()>& task)
1186  {
1187  try {
1188  task();
1189  task_manager_.report_success();
1190  } catch (...) {
1191  task_manager_.report_fail(std::current_exception());
1192  }
1193  }
1194 
1195  sched::TaskManager task_manager_;
1196  std::vector<std::thread> workers_;
1197  std::atomic_size_t active_threads_{ 0 };
1198 };
1199 
1200 // 5. ---------------------------------------------------
1201 
1203 
1207 template<class Function, class... Args>
1208 inline void
1209 push(Function&& f, Args&&... args)
1210 {
1211  ThreadPool::global_instance().push(std::forward<Function>(f),
1212  std::forward<Args>(args)...);
1213 }
1214 
1220 template<class Function, class... Args>
1221 inline auto
1222 async(Function&& f, Args&&... args)
1223  -> std::future<typename detail::Task<Function, Args...>::type>
1224 {
1225  return ThreadPool::global_instance().async(std::forward<Function>(f),
1226  std::forward<Args>(args)...);
1227 }
1228 
1231 inline void
1233 {
1235 }
1236 
1238 inline bool
1240 {
1241  return ThreadPool::global_instance().done();
1242 }
1243 
1247 inline void
1248 set_active_threads(size_t threads)
1249 {
1251 }
1252 
1255 inline size_t
1257 {
1259 }
1260 
1270 template<class UnaryFunction>
1271 inline void
1272 parallel_for(int begin, int end, UnaryFunction&& f)
1273 {
1275  begin, end, std::forward<UnaryFunction>(f));
1276 }
1277 
1287 template<class Items, class UnaryFunction>
1288 inline void
1289 parallel_for_each(Items& items, UnaryFunction&& f)
1290 {
1292  items, std::forward<UnaryFunction>(f));
1293 }
1294 
1295 } // end namespace quickpool
1296 
1297 #undef QUICKPOOL_HAS_CPP17
A work stealing thread pool.
Definition: quickpool.hpp:944
void wait(size_t millis=0)
waits for all jobs currently running on the thread pool. Has no effect when called from threads other...
Definition: quickpool.hpp:1116
void push(Function &&f, Args &&... args)
pushes a job to the thread pool.
Definition: quickpool.hpp:1019
auto async(Function &&f, Args &&... args) -> std::future< typename detail::Task< Function, Args... >::type >
executes a job asynchronously on the global thread pool.
Definition: quickpool.hpp:1035
void stop_and_reset()
Stops the pool, waits for all tasks to finish, resets to neutral.
Definition: quickpool.hpp:1120
size_t get_active_threads() const
retrieves the number of active worker threads in the thread pool.
Definition: quickpool.hpp:1013
void set_active_threads(size_t threads)
sets the number of active worker threads in the thread pool.
Definition: quickpool.hpp:983
static ThreadPool & global_instance()
returns a reference to the global thread pool instance.
Definition: quickpool.hpp:967
bool done() const
checks whether all jobs are done.
Definition: quickpool.hpp:1128
void parallel_for(int begin, int end, UnaryFunction f)
computes an index-based parallel for loop.
Definition: quickpool.hpp:1057
ThreadPool(size_t threads=sched::num_cores_avail())
constructs a thread pool.
Definition: quickpool.hpp:949
void parallel_for_each(Items &items, UnaryFunction f)
computes an iterator-based parallel for loop.
Definition: quickpool.hpp:1092
quickpool namespace
Definition: quickpool.hpp:69
void push(Function &&f, Args &&... args)
Free-standing functions (main API)
Definition: quickpool.hpp:1209
void wait()
waits for all jobs currently running on the global thread pool. Has no effect when not called from ma...
Definition: quickpool.hpp:1232
void parallel_for(int begin, int end, UnaryFunction &&f)
computes an index-based parallel for loop.
Definition: quickpool.hpp:1272
void parallel_for_each(Items &items, UnaryFunction &&f)
computes an iterator-based parallel for loop.
Definition: quickpool.hpp:1289
void set_active_threads(size_t threads)
sets the number of active worker threads in the global thread pool.
Definition: quickpool.hpp:1248
size_t get_active_threads()
retrieves the number of active worker threads in the global thread pool.
Definition: quickpool.hpp:1256
auto async(Function &&f, Args &&... args) -> std::future< typename detail::Task< Function, Args... >::type >
executes a job asynchronously the global thread pool.
Definition: quickpool.hpp:1222
bool done()
checks whether all globel jobs are done.
Definition: quickpool.hpp:1239