25 #include <condition_variable>
35 #if (defined __linux__ || defined AFFINITY)
65 static constexpr std::memory_order relaxed = std::memory_order_relaxed;
66 static constexpr std::memory_order acquire = std::memory_order_acquire;
67 static constexpr std::memory_order release = std::memory_order_release;
68 static constexpr std::memory_order seq_cst = std::memory_order_seq_cst;
74 namespace padding_impl {
78 mod(
size_t a,
size_t b)
80 return a - b * (a / b);
85 template<
class T,
size_t Align>
88 static constexpr
size_t free_space =
89 Align - mod(
sizeof(std::atomic<T>), Align);
90 static constexpr
size_t required = free_space > 1 ? free_space : 1;
91 char padding_[required];
99 template<
class T,
size_t Align>
101 : std::conditional<mod(sizeof(std::atomic<T>), Align) != 0,
102 padding_bytes<T, Align>,
114 alloc(
size_t alignment,
size_t size) noexcept
117 alignment = (alignment >=
alignof(
void*)) ? alignment :
alignof(
void*);
120 size_t space = size + alignment +
sizeof(
void*);
121 void* p = std::malloc(space);
127 void* p_algn =
static_cast<char*
>(p) +
sizeof(
void*);
128 space -=
sizeof(
void*);
131 (void)std::align(alignment, size, p_algn, space);
136 *(
static_cast<void**
>(p_algn) - 1) = p;
142 free(
void* ptr) noexcept
145 std::free(*(
static_cast<void**
>(ptr) - 1));
151 template<
class T, std::
size_t Alignment = 64>
152 class allocator :
public std::allocator<T>
155 static constexpr
size_t min_align =
156 (Alignment >=
alignof(
void*)) ? Alignment :
alignof(
void*);
162 typedef allocator<U, Alignment> other;
166 : std::allocator<T>()
169 template<
class U, std::
size_t UAlignment>
170 allocator(
const allocator<U, UAlignment>& other) noexcept
171 : std::allocator<T>(other)
174 T* allocate(
size_t size,
const void* = 0)
179 void* p = mem::aligned::alloc(min_align,
sizeof(T) * size);
181 throw std::bad_alloc();
183 return static_cast<T*
>(p);
186 void deallocate(T* ptr,
size_t) { mem::aligned::free(ptr); }
188 template<
class U,
class... Args>
189 void construct(U* ptr, Args&&... args)
191 ::new (
static_cast<void *
>(ptr)) U(std::forward<Args>(args)...);
195 void destroy(U *ptr) noexcept
205 template<
class T,
size_t Align = 64>
206 struct alignas(Align) atomic
207 :
public std::atomic<T>
208 ,
private padding_impl::padding<T, Align>
211 atomic() noexcept =
default;
213 atomic(T desired) noexcept
214 : std::atomic<T>(desired)
218 T operator=(T x) noexcept {
return std::atomic<T>::operator=(x); }
219 T operator=(T x)
volatile noexcept {
return std::atomic<T>::operator=(x); }
221 static void*
operator new(
size_t count) noexcept
223 return mem::aligned::alloc(Align, count);
226 static void operator delete(
void* ptr) { mem::aligned::free(ptr); }
231 struct relaxed_atomic :
public mem::aligned::atomic<T>
233 explicit relaxed_atomic(T value)
234 : mem::aligned::atomic<T>(value)
237 operator T()
const noexcept {
return this->load(mem::relaxed); }
239 T operator=(T desired) noexcept
241 this->store(desired, mem::relaxed);
247 template<
class T,
size_t Alignment = 64>
248 using vector = std::vector<T, mem::aligned::allocator<T, Alignment>>;
276 template<
typename Function>
280 Worker(
int begin,
int end, Function fun)
281 : state{ State{ begin, end } }
285 Worker(Worker&& other)
286 : state{ other.state.load() }
287 , f{ std::forward<Function>(other.f) }
290 size_t tasks_left()
const
292 State s = state.load();
293 return s.end - s.pos;
296 bool done()
const {
return (tasks_left() == 0); }
299 void run(std::shared_ptr<mem::aligned::vector<Worker>> others)
313 if (state.compare_exchange_weak(s_old, s)) {
319 if (s.pos == s.end) {
323 this->steal_range(*others);
325 }
while (!this->
done());
329 void steal_range(mem::aligned::vector<Worker>& workers)
332 Worker& other = find_victim(workers);
333 State s = other.state.load();
334 if (s.pos >= s.end) {
341 s.end -= (s.end - s.pos + 1) / 2;
342 if (other.state.compare_exchange_weak(s_old, s)) {
344 state = State{ s.end, s_old.end };
347 }
while (!all_done(workers));
351 bool all_done(
const mem::aligned::vector<Worker>& workers)
353 for (
const auto& worker : workers) {
363 Worker& find_victim(mem::aligned::vector<Worker>& workers)
365 std::vector<size_t> tasks_left;
366 tasks_left.reserve(workers.size());
367 for (
const auto& worker : workers) {
368 tasks_left.push_back(worker.tasks_left());
370 auto max_it = std::max_element(tasks_left.begin(), tasks_left.end());
371 auto idx = std::distance(tasks_left.begin(), max_it);
375 mem::aligned::relaxed_atomic<State> state;
382 template<
typename Function>
383 std::shared_ptr<mem::aligned::vector<Worker<Function>>>
384 create_workers(
const Function& f,
int begin,
int end,
size_t num_workers)
386 auto num_tasks = std::max(end - begin,
static_cast<int>(0));
387 num_workers = std::max(num_workers,
static_cast<size_t>(1));
388 auto workers =
new mem::aligned::vector<Worker<Function>>;
389 workers->reserve(num_workers);
390 for (
size_t i = 0; i < num_workers; i++) {
391 workers->emplace_back(begin + num_tasks * i / num_workers,
392 begin + num_tasks * (i + 1) / num_workers,
395 return std::shared_ptr<mem::aligned::vector<Worker<Function>>>(
411 explicit RingBuffer(
size_t capacity)
412 : buffer_{ std::unique_ptr<T[]>(
new T[capacity]) }
413 , capacity_{ capacity }
414 , mask_{ capacity - 1 }
417 size_t capacity()
const {
return capacity_; }
419 void set_entry(
size_t i, T val) { buffer_[i & mask_] = val; }
421 T get_entry(
size_t i)
const {
return buffer_[i & mask_]; }
423 RingBuffer<T>* enlarged_copy(
size_t bottom,
size_t top)
const
425 RingBuffer<T>* new_buffer =
new RingBuffer{ 2 * capacity_ };
426 for (
size_t i = top; i != bottom; ++i)
427 new_buffer->set_entry(i, this->get_entry(i));
432 std::unique_ptr<T[]> buffer_;
440 using Task = std::function<void()>;
444 TaskQueue(
size_t capacity = 256)
445 : buffer_{
new RingBuffer<Task*>(capacity) }
448 ~TaskQueue() noexcept
451 auto buf_ptr = buffer_.load();
452 for (
int i = top_; i < bottom_.load(mem::relaxed); ++i)
453 delete buf_ptr->get_entry(i);
457 TaskQueue(TaskQueue
const& other) =
delete;
458 TaskQueue& operator=(TaskQueue
const& other) =
delete;
463 return (bottom_.load(mem::relaxed) <= top_.load(mem::relaxed));
468 void push(Task&& task)
471 std::unique_lock<std::mutex> lk(mutex_);
472 auto b = bottom_.load(mem::relaxed);
473 auto t = top_.load(mem::acquire);
474 RingBuffer<Task*>* buf_ptr = buffer_.load(mem::relaxed);
476 if (
static_cast<int>(buf_ptr->capacity()) < (b - t) + 1) {
478 auto old_buf = buf_ptr;
479 buf_ptr = std::move(buf_ptr->enlarged_copy(b, t));
480 old_buffers_.emplace_back(old_buf);
481 buffer_.store(buf_ptr, mem::relaxed);
485 buf_ptr->set_entry(b,
new Task{ std::forward<Task>(task) });
486 bottom_.store(b + 1, mem::release);
493 bool try_pop(Task& task)
495 auto t = top_.load(mem::acquire);
496 std::atomic_thread_fence(mem::seq_cst);
497 auto b = bottom_.load(mem::acquire);
502 auto task_ptr = buffer_.load(mem::acquire)->get_entry(t);
505 if (top_.compare_exchange_strong(
506 t, t + 1, mem::seq_cst, mem::relaxed)) {
507 task = std::move(*task_ptr);
518 std::unique_lock<std::mutex> lk(mutex_);
519 cv_.wait(lk, [
this] {
return !this->empty() || stopped_; });
526 std::lock_guard<std::mutex> lk(mutex_);
535 std::lock_guard<std::mutex> lk(mutex_);
542 mem::aligned::atomic<int> top_{ 0 };
543 mem::aligned::atomic<int> bottom_{ 0 };
546 std::atomic<RingBuffer<Task*>*> buffer_{
nullptr };
549 std::vector<std::unique_ptr<RingBuffer<Task*>>> old_buffers_;
553 std::condition_variable cv_;
554 bool stopped_{
false };
561 explicit TaskManager(
size_t num_queues)
562 : queues_(num_queues)
563 , num_queues_(num_queues)
564 , owner_id_(std::this_thread::get_id())
567 TaskManager& operator=(TaskManager&& other)
569 std::swap(queues_, other.queues_);
570 num_queues_ = other.num_queues_;
571 status_ = other.status_.load();
572 num_waiting_ = other.num_waiting_.load();
573 push_idx_ = other.push_idx_.load();
574 todo_ = other.todo_.load();
578 void resize(
size_t num_queues)
580 num_queues_ = std::max(num_queues,
static_cast<size_t>(1));
581 if (num_queues > queues_.size()) {
582 queues_ = mem::aligned::vector<TaskQueue>(num_queues);
586 status_ = Status::running;
590 template<
typename Task>
591 void push(Task&& task)
595 todo_.fetch_add(1, mem::release);
596 queues_[push_idx_++ % num_queues_].push(task);
600 template<
typename Task>
601 bool try_pop(Task& task,
size_t worker_id = 0)
604 for (
size_t k = 0; k <= num_queues_; k++) {
605 if (queues_[(worker_id + k) % num_queues_].try_pop(task)) {
618 void wake_up_all_workers()
620 for (
auto& q : queues_)
624 void wait_for_jobs(
size_t id)
628 std::lock_guard<std::mutex> lk(mtx_);
629 if (++num_waiting_ == queues_.size())
640 void wait_for_finish(
size_t millis = 0)
642 if (called_from_owner_thread() && is_running()) {
643 auto wake_up = [
this] {
return (todo_ <= 0) || !is_running(); };
644 std::unique_lock<std::mutex> lk(mtx_);
646 cv_.wait(lk, wake_up);
648 cv_.wait_for(lk, std::chrono::milliseconds(millis), wake_up);
654 bool called_from_owner_thread()
const
656 return (std::this_thread::get_id() == owner_id_);
659 void report_success()
662 auto n = todo_.fetch_sub(1, mem::release) - 1;
667 std::lock_guard<std::mutex> lk{ mtx_ };
673 void report_fail(std::exception_ptr err_ptr)
675 std::lock_guard<std::mutex> lk(mtx_);
679 status_ = Status::errored;
683 todo_.store(std::numeric_limits<int>::min() / 2);
690 std::lock_guard<std::mutex> lk(mtx_);
691 status_ = Status::stopped;
694 for (
auto& q : queues_)
698 void rethrow_exception()
701 if (called_from_owner_thread() && has_errored()) {
705 std::unique_lock<std::mutex> lk(mtx_);
706 cv_.wait(lk, [
this] {
return num_waiting_ == queues_.size(); });
711 auto current_exception = err_ptr_;
713 status_ = Status::running;
715 std::rethrow_exception(current_exception);
719 bool is_running()
const
721 return status_.load(mem::relaxed) == Status::running;
724 bool has_errored()
const
726 return status_.load(mem::relaxed) == Status::errored;
731 return status_.load(mem::relaxed) == Status::stopped;
734 bool done()
const {
return (todo_.load(mem::relaxed) <= 0); }
738 mem::aligned::vector<TaskQueue> queues_;
742 mem::aligned::relaxed_atomic<size_t> num_waiting_{ 0 };
743 mem::aligned::relaxed_atomic<size_t> push_idx_{ 0 };
744 mem::aligned::atomic<int> todo_{ 0 };
747 const std::thread::id owner_id_;
754 mem::aligned::atomic<Status> status_{ Status::running };
756 std::condition_variable cv_;
757 std::exception_ptr err_ptr_{
nullptr };
761 inline std::vector<size_t>
764 auto ncores = std::thread::hardware_concurrency();
765 std::vector<size_t> avail_cores;
766 avail_cores.reserve(ncores);
767 #if (defined __linux__)
769 int rc = pthread_getaffinity_np(pthread_self(),
sizeof(cpu_set_t), &cpuset);
771 throw std::runtime_error(
"Error calling pthread_getaffinity_np");
773 for (
size_t id = 0;
id < ncores;
id++) {
774 if (CPU_ISSET(
id, &cpuset)) {
775 avail_cores.push_back(
id);
785 #if (defined __linux__)
786 return get_avail_cores().size();
788 return std::thread::hardware_concurrency();
802 explicit ThreadPool(
size_t threads = sched::num_cores_avail())
803 : task_manager_{ threads }
810 task_manager_.stop();
814 ThreadPool(ThreadPool&&) =
delete;
815 ThreadPool(
const ThreadPool&) =
delete;
816 ThreadPool& operator=(
const ThreadPool&) =
delete;
817 ThreadPool& operator=(ThreadPool&& other) =
delete;
838 if (!task_manager_.called_from_owner_thread())
841 if (threads <= workers_.size()) {
842 task_manager_.resize(threads);
844 if (workers_.size() > 0) {
845 task_manager_.stop();
848 workers_ = std::vector<std::thread>{ threads };
849 task_manager_ = quickpool::sched::TaskManager{ threads };
850 for (
size_t id = 0;
id < threads; ++id) {
853 #if (defined __linux__)
854 set_thread_affinity();
857 active_threads_ = threads;
866 template<
class Function,
class... Args>
867 void push(Function&& f, Args&&... args)
869 if (active_threads_ == 0)
872 std::bind(std::forward<Function>(f), std::forward<Args>(args)...));
880 template<
class Function,
class... Args>
881 auto async(Function&& f, Args&&... args)
882 -> std::future<decltype(f(args...))>
885 std::bind(std::forward<Function>(f), std::forward<Args>(args)...);
886 using pack_t = std::packaged_task<decltype(f(args...))()>;
887 auto task_ptr = std::make_shared<pack_t>(std::move(pack));
888 this->
push([task_ptr] { (*task_ptr)(); });
889 return task_ptr->get_future();
901 template<
class UnaryFunction>
907 auto workers = loop::create_workers<UnaryFunction>(f, begin, end, n);
908 for (
int k = 0; k < n; k++) {
909 this->
push([=] { workers->at(k).run(workers); });
923 template<
class Items,
class UnaryFunction>
926 auto begin = std::begin(items);
927 auto size = std::distance(begin, std::end(items));
928 this->
parallel_for(0, size, [=](
int i) { f(begin[i]); });
935 void wait(
size_t millis = 0) { task_manager_.wait_for_finish(millis); }
938 bool done()
const {
return task_manager_.done(); }
941 static void*
operator new(
size_t count)
943 return mem::aligned::alloc(
alignof(
ThreadPool), count);
947 static void operator delete(
void* ptr) { mem::aligned::free(ptr); }
953 for (
auto& worker : workers_) {
954 if (worker.joinable())
961 void add_worker(
size_t id)
963 workers_[id] = std::thread([&,
id] {
964 std::function<void()> task;
965 while (!task_manager_.stopped()) {
966 task_manager_.wait_for_jobs(
id);
969 while (task_manager_.try_pop(task,
id))
970 this->execute_safely(task);
971 }
while (!task_manager_.done());
976 #if (defined __linux__)
978 void set_thread_affinity()
981 auto avail_cores = sched::get_avail_cores();
982 for (
size_t id = 0;
id < workers_.size();
id++) {
984 CPU_SET(avail_cores[
id % avail_cores.size()], &cpuset);
985 int rc = pthread_setaffinity_np(
986 workers_.at(
id).native_handle(),
sizeof(cpu_set_t), &cpuset);
988 throw std::runtime_error(
989 "Error calling pthread_setaffinity_np");
995 void execute_safely(std::function<
void()>& task)
999 task_manager_.report_success();
1001 task_manager_.report_fail(std::current_exception());
1005 sched::TaskManager task_manager_;
1006 std::vector<std::thread> workers_;
1007 std::atomic_size_t active_threads_;
1017 template<
class Function,
class... Args>
1022 std::forward<Args>(args)...);
1030 template<
class Function,
class... Args>
1032 async(Function&& f, Args&&... args) -> std::future<decltype(f(args...))>
1035 std::forward<Args>(args)...);
1079 template<
class UnaryFunction>
1084 begin, end, std::forward<UnaryFunction>(f));
1096 template<
class Items,
class UnaryFunction>
1101 items, std::forward<UnaryFunction>(f));
A work stealing thread pool.
void wait(size_t millis=0)
waits for all jobs currently running on the thread pool. Has no effect when called from threads other...
void push(Function &&f, Args &&... args)
pushes a job to the thread pool.
size_t get_active_threads() const
retrieves the number of active worker threads in the thread pool.
void set_active_threads(size_t threads)
sets the number of active worker threads in the thread pool.
static ThreadPool & global_instance()
returns a reference to the global thread pool instance.
bool done() const
checks whether all jobs are done.
void parallel_for(int begin, int end, UnaryFunction f)
computes an index-based parallel for loop.
ThreadPool(size_t threads=sched::num_cores_avail())
constructs a thread pool.
void parallel_for_each(Items &items, UnaryFunction f)
computes a iterator-based parallel for loop.
auto async(Function &&f, Args &&... args) -> std::future< decltype(f(args...))>
executes a job asynchronously on the global thread pool.
void push(Function &&f, Args &&... args)
Free-standing functions (main API)
void wait()
waits for all jobs currently running on the global thread pool. Has no effect when not called from ma...
void parallel_for(int begin, int end, UnaryFunction &&f)
computes an index-based parallel for loop.
void parallel_for_each(Items &items, UnaryFunction &&f)
computes a iterator-based parallel for loop.
void set_active_threads(size_t threads)
sets the number of active worker threads in the global thread pool.
size_t get_active_threads()
retrieves the number of active worker threads in the global thread pool.
bool done()
checks whether all globel jobs are done.
auto async(Function &&f, Args &&... args) -> std::future< decltype(f(args...))>
executes a job asynchronously the global thread pool.