Make threadpool implement executor interface

This commit is contained in:
Nikita Lisitsa 2020-11-19 23:48:50 +03:00
parent 8bfc6cfd72
commit daf1a3c5f4
3 changed files with 81 additions and 50 deletions

View file

@ -270,7 +270,7 @@ struct node_controller
std::size_t node_count() const { return node_count_; } std::size_t node_count() const { return node_count_; }
std::size_t loader_queue_size() const { return loader_.queue_size(); } std::size_t loader_queue_size() const { return loader_.task_count(); }
void preload(int max_level); void preload(int max_level);

View file

@ -3,6 +3,7 @@
#include <psemek/util/thread.hpp> #include <psemek/util/thread.hpp>
#include <psemek/util/synchronyzed_queue.hpp> #include <psemek/util/synchronyzed_queue.hpp>
#include <psemek/util/movable_function.hpp> #include <psemek/util/movable_function.hpp>
#include <psemek/util/executor.hpp>
#include <future> #include <future>
#include <vector> #include <vector>
@ -14,58 +15,35 @@ namespace psemek::util
{ {
struct threadpool struct threadpool
: executor
{ {
threadpool(std::string const & name) threadpool(std::string const & name)
: threadpool(name, std::max(1u, std::thread::hardware_concurrency())) : threadpool(name, std::max(1u, std::thread::hardware_concurrency()))
{} {}
threadpool(std::string const & name, std::size_t thread_count) threadpool(std::string const & name, std::size_t thread_count);
: name(name)
, working_count_{0}
{
start(thread_count);
}
~ threadpool() ~threadpool() override { stop(); }
{
stop();
}
template <typename F> void post(task t) override;
auto dispatch(F && f)
{
using R = decltype(f());
std::packaged_task<R()> task{std::forward<F>(f)}; void stop() override;
auto result = task.get_future(); void wait() override;
tasks_queue.push(std::move(task)); void wait_for(clock::duration period) override;
return result; void wait_until(clock::time_point time) override;
}
void start(std::size_t thread_count); std::size_t task_count() const override;
void stop();
void wait()
{
tasks_queue.wait();
std::unique_lock lock{working_count_mutex_};
working_count_cv_.wait(lock, [this]{ return working_count_ == 0; });
}
std::size_t queue_size() const { return tasks_queue.size(); }
private: private:
std::string const name; std::vector<util::thread> threads_;
std::vector<util::thread> threads; util::synchronized_queue<task> task_queue_;
util::synchronized_queue<movable_function<void()>> tasks_queue; std::atomic<bool> running_;
std::size_t working_count_; std::size_t working_count_;
std::mutex working_count_mutex_; mutable std::mutex working_count_mutex_;
std::condition_variable working_count_cv_; std::condition_variable working_count_cv_;
}; };

View file

@ -1,31 +1,55 @@
#include <psemek/util/threadpool.hpp> #include <psemek/util/threadpool.hpp>
#include <psemek/util/unused.hpp> #include <psemek/util/unused.hpp>
#include <psemek/util/to_string.hpp> #include <psemek/util/to_string.hpp>
#include <psemek/util/at_scope_exit.hpp>
#include <psemek/log/log.hpp> #include <psemek/log/log.hpp>
namespace psemek::util namespace psemek::util
{ {
void threadpool::start(std::size_t thread_count) namespace
{
struct stop_execution{};
}
threadpool::threadpool(std::string const & name, std::size_t thread_count)
: running_{true}
, working_count_{0}
{ {
for (std::size_t th = 0; th < thread_count; ++th) for (std::size_t th = 0; th < thread_count; ++th)
{ {
threads.emplace_back([this, th] threads_.emplace_back([this, &name, th, thread_count]
{ {
log::register_thread(to_string(name, '#', th)); log::register_thread(thread_count == 1 ? name : to_string(name, '#', th));
while (true) for (bool running = true; running;)
{ {
auto task = tasks_queue.pop(); auto task = task_queue_.pop();
if (!task)
break;
{ {
std::lock_guard lock{working_count_mutex_}; std::lock_guard lock{working_count_mutex_};
++working_count_; ++working_count_;
} }
task();
try
{
task();
}
catch (stop_execution const &)
{
running = false;
}
catch (std::exception const & e)
{
log::error() << "Unhandled exception in threadpool executor: " << e.what();
}
catch (...)
{
log::error() << "Unhandled unknown exception in threadpool executor";
}
{ {
std::lock_guard lock{working_count_mutex_}; std::lock_guard lock{working_count_mutex_};
--working_count_; --working_count_;
@ -36,15 +60,44 @@ namespace psemek::util
} }
} }
void threadpool::post(task t)
{
task_queue_.push(std::move(t));
}
void threadpool::stop() void threadpool::stop()
{ {
tasks_queue.clear(); task_queue_.clear();
for (auto const & thread: threads) for (auto const & thread: threads_)
{ {
unused(thread); unused(thread);
tasks_queue.push({}); task_queue_.push([]{ throw stop_execution{}; });
} }
threads.clear(); threads_.clear();
}
void threadpool::wait()
{
std::unique_lock lock{working_count_mutex_};
working_count_cv_.wait(lock, [this]{ return working_count_ == 0 && task_queue_.size() == 0; });
}
void threadpool::wait_for(clock::duration period)
{
std::unique_lock lock{working_count_mutex_};
working_count_cv_.wait_for(lock, period, [this]{ return working_count_ == 0 && task_queue_.size() == 0; });
}
void threadpool::wait_until(clock::time_point time)
{
std::unique_lock lock{working_count_mutex_};
working_count_cv_.wait_until(lock, time, [this]{ return working_count_ == 0 && task_queue_.size() == 0; });
}
std::size_t threadpool::task_count() const
{
std::unique_lock lock{working_count_mutex_};
return task_queue_.size() + working_count_;
} }
} }