25 #include <condition_variable>
37 #include <type_traits>
41 #if (defined __linux__ || defined AFFINITY)
45 #if __cplusplus >= 201703L || (defined(_MSVC_LANG) && _MSVC_LANG >= 201703L)
46 #define QUICKPOOL_HAS_CPP17 1
48 #define QUICKPOOL_HAS_CPP17 0
73 template<
class Function,
class... Args>
76 #if QUICKPOOL_HAS_CPP17
77 using type = std::invoke_result_t<Function, Args...>;
79 static auto make(Function&& f, Args&&... args)
81 return [f = std::forward<Function>(f),
82 args = std::make_tuple(std::forward<Args>(args)...)]()
mutable {
83 return std::apply(f, args);
87 using type = decltype(std::declval<Function>()(std::declval<Args>()...));
89 static auto make(Function&& f, Args&&... args)
90 -> decltype(std::bind(std::forward<Function>(f),
91 std::forward<Args>(args)...))
93 return std::bind(std::forward<Function>(f),
94 std::forward<Args>(args)...);
107 static constexpr std::memory_order relaxed = std::memory_order_relaxed;
108 static constexpr std::memory_order acquire = std::memory_order_acquire;
109 static constexpr std::memory_order release = std::memory_order_release;
110 static constexpr std::memory_order seq_cst = std::memory_order_seq_cst;
116 namespace padding_impl {
120 mod(
size_t a,
size_t b)
122 return a - b * (a / b);
127 template<
class T,
size_t Align>
130 static constexpr
size_t free_space =
131 Align - mod(
sizeof(std::atomic<T>), Align);
132 static constexpr
size_t required = free_space > 1 ? free_space : 1;
133 char padding_[required];
141 template<
class T,
size_t Align>
143 : std::conditional<mod(sizeof(std::atomic<T>), Align) != 0,
144 padding_bytes<T, Align>,
156 alloc(
size_t alignment,
size_t size) noexcept
159 alignment = (alignment >=
alignof(
void*)) ? alignment :
alignof(
void*);
162 size_t space = size + alignment +
sizeof(
void*);
163 void* p = std::malloc(space);
169 void* p_algn =
static_cast<char*
>(p) +
sizeof(
void*);
170 space -=
sizeof(
void*);
173 (void)std::align(alignment, size, p_algn, space);
178 *(
static_cast<void**
>(p_algn) - 1) = p;
184 free(
void* ptr) noexcept
187 std::free(*(
static_cast<void**
>(ptr) - 1));
193 template<
class T, std::
size_t Alignment = 64>
194 class allocator :
public std::allocator<T>
197 static constexpr
size_t min_align =
198 (Alignment >=
alignof(
void*)) ? Alignment :
alignof(
void*);
204 typedef allocator<U, Alignment> other;
208 : std::allocator<T>()
211 template<
class U, std::
size_t UAlignment>
212 allocator(
const allocator<U, UAlignment>& other) noexcept
213 : std::allocator<T>(other)
216 T* allocate(
size_t size,
const void* = 0)
221 void* p = mem::aligned::alloc(min_align,
sizeof(T) * size);
223 throw std::bad_alloc();
225 return static_cast<T*
>(p);
228 #if defined(__cpp_lib_allocate_at_least)
229 std::allocation_result<T*> allocate_at_least(std::size_t size)
231 return { allocate(size), size };
235 void deallocate(T* ptr,
size_t) { mem::aligned::free(ptr); }
237 template<
class U,
class... Args>
238 void construct(U* ptr, Args&&... args)
240 ::new (
static_cast<void *
>(ptr)) U(std::forward<Args>(args)...);
244 void destroy(U *ptr) noexcept
254 template<
class T,
size_t Align = 64>
255 struct alignas(Align) atomic
256 :
public std::atomic<T>
257 ,
private padding_impl::padding<T, Align>
260 atomic() noexcept = default;
262 atomic(T desired) noexcept
263 : std::atomic<T>(desired)
267 T operator=(T x) noexcept {
return std::atomic<T>::operator=(x); }
268 T operator=(T x)
volatile noexcept {
return std::atomic<T>::operator=(x); }
270 static void*
operator new(
size_t count) noexcept
272 return mem::aligned::alloc(Align, count);
275 static void operator delete(
void* ptr) { mem::aligned::free(ptr); }
280 struct relaxed_atomic :
public mem::aligned::atomic<T>
282 explicit relaxed_atomic(T value)
283 : mem::aligned::atomic<T>(value)
286 operator T() const noexcept {
return this->load(mem::relaxed); }
288 T operator=(T desired) noexcept
290 this->store(desired, mem::relaxed);
296 template<
class T,
size_t Alignment = 64>
297 using vector = std::vector<T, mem::aligned::allocator<T, Alignment>>;
308 template<
class Iterator,
class UnaryFunction,
class Pool>
310 parallel_for_each_impl(Iterator begin,
315 std::random_access_iterator_tag)
318 pool.parallel_for(0,
static_cast<int>(size), [=](
int i) { f(begin[i]); });
321 template<
class Iterator,
class UnaryFunction,
class Pool,
class IteratorCategory>
323 parallel_for_each_impl(Iterator begin,
330 auto iterators = std::make_shared<std::vector<Iterator>>();
331 iterators->reserve(
static_cast<size_t>(size));
332 for (
auto it = begin; it != end; ++it) {
333 iterators->push_back(it);
335 pool.parallel_for(0,
static_cast<int>(iterators->size()), [=](
int i) {
336 f(*iterators->at(
static_cast<size_t>(i)));
357 template<
typename Function>
361 Worker(
int begin,
int end, Function fun)
362 : state{ State{ begin, end } }
366 Worker(Worker&& other)
367 : state{ other.state.load() }
368 , f{ std::forward<Function>(other.f) }
371 size_t tasks_left()
const
373 State s = state.load();
374 return (s.end > s.pos) ?
static_cast<size_t>(s.end - s.pos)
375 :
static_cast<size_t>(0);
378 bool done()
const {
return (tasks_left() == 0); }
381 void run(std::shared_ptr<mem::aligned::vector<Worker>> others)
395 if (state.compare_exchange_weak(s_old, s)) {
401 if (s.pos == s.end) {
405 this->steal_range(*others);
407 }
while (!this->
done());
411 void steal_range(mem::aligned::vector<Worker>& workers)
414 Worker& other = find_victim(workers);
415 State s = other.state.load();
416 if (s.pos >= s.end) {
423 s.end -= (s.end - s.pos + 1) / 2;
424 if (other.state.compare_exchange_weak(s_old, s)) {
426 state = State{ s.end, s_old.end };
429 }
while (!all_done(workers));
433 bool all_done(
const mem::aligned::vector<Worker>& workers)
435 for (
const auto& worker : workers) {
445 Worker& find_victim(mem::aligned::vector<Worker>& workers)
448 size_t most_tasks_left = 0;
449 for (
size_t i = 0; i < workers.size(); ++i) {
450 const auto tasks_left = workers[i].tasks_left();
451 if (tasks_left > most_tasks_left) {
453 most_tasks_left = tasks_left;
456 return workers[best];
459 mem::aligned::relaxed_atomic<State> state;
466 template<
typename Function>
467 std::shared_ptr<mem::aligned::vector<Worker<Function>>>
468 create_workers(
const Function& f,
int begin,
int end,
size_t num_workers)
470 auto num_tasks = std::max(end - begin,
static_cast<int>(0));
471 num_workers = std::max(num_workers,
static_cast<size_t>(1));
472 auto workers = std::make_shared<mem::aligned::vector<Worker<Function>>>();
473 workers->reserve(num_workers);
474 for (
size_t i = 0; i < num_workers; i++) {
476 begin +
static_cast<int>(
static_cast<size_t>(num_tasks) * i /
479 begin +
static_cast<int>(
static_cast<size_t>(num_tasks) * (i + 1) /
481 workers->emplace_back(first, last, f);
498 explicit RingBuffer(
size_t capacity)
500 std::unique_ptr<std::atomic<T>[]>(new std::atomic<T>[capacity])
502 , capacity_{ capacity }
503 , mask_{ capacity - 1 }
506 size_t capacity()
const {
return capacity_; }
508 void set_entry(
size_t i, T val)
510 buffer_[i & mask_].store(val, mem::relaxed);
513 T get_entry(
size_t i)
const
515 return buffer_[i & mask_].load(mem::relaxed);
518 RingBuffer<T>* enlarged_copy(
size_t bottom,
size_t top)
const
520 RingBuffer<T>* new_buffer =
new RingBuffer{ 2 * capacity_ };
521 for (
size_t i = top; i != bottom; ++i)
522 new_buffer->set_entry(i, this->get_entry(i));
527 std::unique_ptr<std::atomic<T>[]> buffer_;
535 using Task = std::function<void()>;
540 TaskNode* next{
nullptr };
545 TaskQueue(
size_t capacity = 256)
546 : buffer_{ new RingBuffer<TaskNode*>(capacity) }
549 ~TaskQueue() noexcept
551 delete buffer_.load();
554 TaskQueue(TaskQueue
const& other) =
delete;
555 TaskQueue& operator=(TaskQueue
const& other) =
delete;
560 return (bottom_.load(mem::relaxed) <= top_.load(mem::relaxed));
565 void push(Task&& task)
568 std::unique_lock<std::mutex> lk(mutex_);
569 auto b = bottom_.load(mem::relaxed);
570 auto t = top_.load(mem::acquire);
571 RingBuffer<TaskNode*>* buf_ptr = buffer_.load(mem::relaxed);
573 const auto size = b - t;
574 if (buf_ptr->capacity() < size + 1) {
576 auto old_buf = buf_ptr;
577 buf_ptr = std::move(buf_ptr->enlarged_copy(b, t));
578 old_buffers_.emplace_back(old_buf);
579 buffer_.store(buf_ptr, mem::release);
582 auto node = acquire_node();
584 node->task = std::forward<Task>(task);
590 buf_ptr->set_entry(b, node);
591 bottom_.store(b + 1, mem::release);
598 bool try_pop(Task& task)
600 auto t = top_.load(mem::acquire);
601 std::atomic_thread_fence(mem::seq_cst);
602 auto b = bottom_.load(mem::acquire);
607 auto node = buffer_.load(mem::acquire)->get_entry(t);
610 if (top_.compare_exchange_strong(
611 t, t + 1, mem::seq_cst, mem::relaxed)) {
612 task = std::move(node->task);
623 std::unique_lock<std::mutex> lk(mutex_);
624 cv_.wait(lk, [
this] {
return !this->empty() || stopped_; });
631 std::lock_guard<std::mutex> lk(mutex_);
640 std::lock_guard<std::mutex> lk(mutex_);
647 mem::aligned::atomic<size_t> top_{ 0 };
648 mem::aligned::atomic<size_t> bottom_{ 0 };
651 std::atomic<RingBuffer<TaskNode*>*> buffer_{
nullptr };
654 std::vector<std::unique_ptr<RingBuffer<TaskNode*>>> old_buffers_;
657 std::vector<std::unique_ptr<TaskNode>> allocated_nodes_;
660 std::atomic<TaskNode*> free_nodes_{
nullptr };
664 TaskNode* acquire_node()
666 auto node = free_nodes_.load(mem::acquire);
667 while (node !=
nullptr) {
668 auto next = node->next;
669 if (free_nodes_.compare_exchange_weak(
670 node, next, mem::acquire, mem::acquire)) {
671 node->next =
nullptr;
676 allocated_nodes_.emplace_back(
new TaskNode);
677 return allocated_nodes_.back().get();
680 void recycle_node(TaskNode* node) noexcept
682 node->task =
nullptr;
683 auto head = free_nodes_.load(mem::relaxed);
686 }
while (!free_nodes_.compare_exchange_weak(
687 head, node, mem::release, mem::relaxed));
692 std::condition_variable cv_;
693 bool stopped_{
false };
700 explicit TaskManager(
size_t num_queues)
701 : queues_(std::max(num_queues, static_cast<size_t>(1)))
702 , num_queues_(std::max(num_queues, static_cast<size_t>(1)))
703 , owner_id_(std::this_thread::get_id())
706 TaskManager& operator=(TaskManager&& other)
708 std::swap(queues_, other.queues_);
709 num_queues_ = other.num_queues_;
710 status_ = other.status_.load();
711 num_waiting_ = other.num_waiting_.load();
712 push_idx_ = other.push_idx_.load();
713 todo_ = other.todo_.load();
717 void resize(
size_t num_queues)
720 throw std::logic_error(
"cannot resize with pending tasks");
722 num_queues_ = std::max(num_queues,
static_cast<size_t>(1));
723 if (num_queues_ > queues_.size()) {
724 queues_ = mem::aligned::vector<TaskQueue>(num_queues_);
728 status_ = Status::running;
732 template<
typename Task>
733 void push(Task&& task)
737 todo_.fetch_add(1, mem::release);
739 queues_[push_idx_++ % num_queues_].push(task);
747 template<
typename Task>
748 bool try_pop(Task& task,
size_t worker_id = 0)
751 for (
size_t k = 0; k < num_queues_; k++) {
752 if (queues_[(worker_id + k) % num_queues_].try_pop(task)) {
765 void wake_up_all_workers()
767 for (
auto& q : queues_)
771 void wait_for_jobs(
size_t id)
775 std::lock_guard<std::mutex> lk(mtx_);
776 if (++num_waiting_ == queues_.size())
787 void wait_for_finish(
size_t millis = 0)
789 if (called_from_owner_thread() && is_running()) {
790 auto wake_up = [
this] {
return (todo_ <= 0) || !is_running(); };
791 std::unique_lock<std::mutex> lk(mtx_);
793 cv_.wait(lk, wake_up);
795 cv_.wait_for(lk, std::chrono::milliseconds(millis), wake_up);
801 bool called_from_owner_thread()
const
803 return (std::this_thread::get_id() == owner_id_);
806 void report_success()
809 auto n = todo_.fetch_sub(1, mem::release) - 1;
814 std::lock_guard<std::mutex> lk{ mtx_ };
820 void report_fail(std::exception_ptr err_ptr)
822 std::lock_guard<std::mutex> lk(mtx_);
826 status_ = Status::errored;
830 todo_.store(std::numeric_limits<int>::min() / 2);
837 std::lock_guard<std::mutex> lk(mtx_);
838 status_ = Status::stopped;
841 for (
auto& q : queues_)
845 void rethrow_exception()
848 if (called_from_owner_thread() && has_errored()) {
852 std::unique_lock<std::mutex> lk(mtx_);
853 cv_.wait(lk, [
this] {
return num_waiting_ == queues_.size(); });
858 auto current_exception = err_ptr_;
860 status_ = Status::running;
862 std::rethrow_exception(current_exception);
866 bool is_running()
const
868 return status_.load(mem::relaxed) == Status::running;
871 bool has_errored()
const
873 return status_.load(mem::relaxed) == Status::errored;
878 return status_.load(mem::relaxed) == Status::stopped;
881 bool done()
const {
return (todo_.load(mem::relaxed) <= 0); }
885 mem::aligned::vector<TaskQueue> queues_;
889 mem::aligned::relaxed_atomic<size_t> num_waiting_{ 0 };
890 mem::aligned::relaxed_atomic<size_t> push_idx_{ 0 };
891 mem::aligned::atomic<int> todo_{ 0 };
894 const std::thread::id owner_id_;
901 mem::aligned::atomic<Status> status_{ Status::running };
903 std::condition_variable cv_;
904 std::exception_ptr err_ptr_{
nullptr };
908 inline std::vector<size_t>
911 auto ncores = std::thread::hardware_concurrency();
912 std::vector<size_t> avail_cores;
913 avail_cores.reserve(ncores);
914 #if (defined __linux__)
916 int rc = pthread_getaffinity_np(pthread_self(),
sizeof(cpu_set_t), &cpuset);
918 throw std::runtime_error(
"Error calling pthread_getaffinity_np");
920 for (
size_t id = 0;
id < ncores;
id++) {
921 if (CPU_ISSET(
id, &cpuset)) {
922 avail_cores.push_back(
id);
932 #if (defined __linux__)
933 return get_avail_cores().size();
935 return std::thread::hardware_concurrency();
949 explicit ThreadPool(
size_t threads = sched::num_cores_avail())
950 : task_manager_{ threads }
957 task_manager_.stop();
961 ThreadPool(ThreadPool&&) =
delete;
962 ThreadPool(
const ThreadPool&) =
delete;
963 ThreadPool& operator=(
const ThreadPool&) =
delete;
964 ThreadPool& operator=(ThreadPool&& other) =
delete;
985 if (!task_manager_.called_from_owner_thread())
988 if (threads == active_threads_.load(mem::relaxed)) {
993 if (workers_.size() > 0) {
994 task_manager_.stop();
999 task_manager_ = quickpool::sched::TaskManager{ threads };
1000 workers_ = std::vector<std::thread>{ threads };
1001 for (
size_t id = 0;
id < threads; ++id) {
1004 #if (defined __linux__)
1006 set_thread_affinity();
1009 active_threads_ = threads;
1018 template<
class Function,
class... Args>
1019 void push(Function&& f, Args&&... args)
1021 if (active_threads_ == 0) {
1022 std::forward<Function>(f)(std::forward<Args>(args)...);
1026 std::forward<Function>(f), std::forward<Args>(args)...));
1034 template<
class Function,
class... Args>
1035 auto async(Function&& f, Args&&... args)
1036 -> std::future<
typename detail::Task<Function, Args...>::type>
1039 std::forward<Function>(f), std::forward<Args>(args)...);
1040 using result_t =
typename detail::Task<Function, Args...>::type;
1041 using pack_t = std::packaged_task<result_t()>;
1042 auto task_ptr = std::make_shared<pack_t>(std::move(pack));
1043 this->
push([task_ptr] { (*task_ptr)(); });
1044 return task_ptr->get_future();
1056 template<
class UnaryFunction>
1062 const auto active_threads = active_threads_.load(mem::relaxed);
1063 if (active_threads == 0) {
1064 for (
auto i = begin; i < end; ++i) {
1072 const auto num_tasks =
static_cast<size_t>(end - begin);
1074 std::min(std::max(active_threads,
static_cast<size_t>(1)), num_tasks);
1075 auto workers = loop::create_workers<UnaryFunction>(f, begin, end, n);
1076 for (
size_t k = 0; k < n; k++) {
1077 this->
push([=] { workers->at(k).run(workers); });
1091 template<
class Items,
class UnaryFunction>
1094 auto begin = std::begin(items);
1095 auto size = std::distance(begin, std::end(items));
1099 if (size > std::numeric_limits<int>::max()) {
1100 throw std::length_error(
"parallel_for_each range is too large");
1102 typedef typename std::iterator_traits<decltype(begin)>::iterator_category
1104 loop::parallel_for_each_impl(begin,
1106 static_cast<int>(size),
1109 iterator_category{});
1116 void wait(
size_t millis = 0) { task_manager_.wait_for_finish(millis); }
1122 task_manager_.report_fail(std::current_exception());
1123 task_manager_.wait_for_finish();
1124 task_manager_.rethrow_exception();
1128 bool done()
const {
return task_manager_.done(); }
1131 static void*
operator new(
size_t count)
1133 return mem::aligned::alloc(
alignof(
ThreadPool), count);
1137 static void operator delete(
void* ptr) { mem::aligned::free(ptr); }
1143 for (
auto& worker : workers_) {
1144 if (worker.joinable())
1151 void add_worker(
size_t id)
1153 workers_[id] = std::thread([&,
id] {
1154 std::function<void()> task;
1155 while (!task_manager_.stopped()) {
1156 task_manager_.wait_for_jobs(
id);
1159 while (task_manager_.try_pop(task,
id))
1160 this->execute_safely(task);
1161 }
while (!task_manager_.done());
1166 #if (defined __linux__)
1168 void set_thread_affinity()
1171 auto avail_cores = sched::get_avail_cores();
1172 for (
size_t id = 0;
id < workers_.size();
id++) {
1174 CPU_SET(avail_cores[
id % avail_cores.size()], &cpuset);
1175 int rc = pthread_setaffinity_np(
1176 workers_.at(
id).native_handle(),
sizeof(cpu_set_t), &cpuset);
1178 throw std::runtime_error(
1179 "Error calling pthread_setaffinity_np");
1185 void execute_safely(std::function<
void()>& task)
1189 task_manager_.report_success();
1191 task_manager_.report_fail(std::current_exception());
1195 sched::TaskManager task_manager_;
1196 std::vector<std::thread> workers_;
1197 std::atomic_size_t active_threads_{ 0 };
1207 template<
class Function,
class... Args>
1212 std::forward<Args>(args)...);
1220 template<
class Function,
class... Args>
1223 -> std::future<
typename detail::Task<Function, Args...>::type>
1226 std::forward<Args>(args)...);
1270 template<
class UnaryFunction>
1275 begin, end, std::forward<UnaryFunction>(f));
1287 template<
class Items,
class UnaryFunction>
1292 items, std::forward<UnaryFunction>(f));
1297 #undef QUICKPOOL_HAS_CPP17
A work stealing thread pool.
void wait(size_t millis=0)
waits for all jobs currently running on the thread pool. Has no effect when called from threads other...
void push(Function &&f, Args &&... args)
pushes a job to the thread pool.
auto async(Function &&f, Args &&... args) -> std::future< typename detail::Task< Function, Args... >::type >
executes a job asynchronously on the global thread pool.
void stop_and_reset()
Stops the pool, waits for all tasks to finish, resets to neutral.
size_t get_active_threads() const
retrieves the number of active worker threads in the thread pool.
void set_active_threads(size_t threads)
sets the number of active worker threads in the thread pool.
static ThreadPool & global_instance()
returns a reference to the global thread pool instance.
bool done() const
checks whether all jobs are done.
void parallel_for(int begin, int end, UnaryFunction f)
computes an index-based parallel for loop.
ThreadPool(size_t threads=sched::num_cores_avail())
constructs a thread pool.
void parallel_for_each(Items &items, UnaryFunction f)
computes an iterator-based parallel for loop.
void push(Function &&f, Args &&... args)
Free-standing functions (main API)
void wait()
waits for all jobs currently running on the global thread pool. Has no effect when not called from ma...
void parallel_for(int begin, int end, UnaryFunction &&f)
computes an index-based parallel for loop.
void parallel_for_each(Items &items, UnaryFunction &&f)
computes an iterator-based parallel for loop.
void set_active_threads(size_t threads)
sets the number of active worker threads in the global thread pool.
size_t get_active_threads()
retrieves the number of active worker threads in the global thread pool.
auto async(Function &&f, Args &&... args) -> std::future< typename detail::Task< Function, Args... >::type >
executes a job asynchronously the global thread pool.
bool done()
checks whether all globel jobs are done.