psemek/libs/async/source/threadpool.cpp

113 lines
2.4 KiB
C++

#include <psemek/async/threadpool.hpp>
#include <psemek/util/unused.hpp>
#include <psemek/util/to_string.hpp>
#include <psemek/log/log.hpp>
namespace psemek::async
{
namespace
{
struct stop_execution{};
}
threadpool::threadpool(std::string const & name, std::size_t thread_count)
: working_count_{0}
{
for (std::size_t th = 0; th < thread_count; ++th)
{
std::string tname = thread_count == 1 ? name : util::to_string(name, '#', th);
threads_.emplace_back([this, tname = std::move(tname)]
{
log::register_thread(tname);
for (bool running = true; running;)
{
auto task = task_queue_.pop();
{
std::lock_guard lock{working_count_mutex_};
++working_count_;
}
try
{
task();
}
catch (stop_execution const &)
{
running = false;
}
catch (std::exception const & e)
{
log::error() << "Unhandled exception in threadpool executor: " << e.what();
}
catch (...)
{
log::error() << "Unhandled unknown exception in threadpool executor";
}
{
std::lock_guard lock{working_count_mutex_};
--working_count_;
}
working_count_cv_.notify_all();
}
});
}
}
void threadpool::post(task t)
{
task_queue_.push(std::move(t));
}
void threadpool::post_at(clock::time_point time, task t)
{
if (time > clock::now())
{
task_queue_.push_at(time, std::move(t));
}
else
post(std::move(t));
}
void threadpool::stop()
{
task_queue_.clear();
for (auto const & thread: threads_)
{
unused(thread);
task_queue_.push([]{ throw stop_execution{}; });
}
threads_.clear();
}
void threadpool::wait()
{
std::unique_lock lock{working_count_mutex_};
working_count_cv_.wait(lock, [this]{ return working_count_ == 0 && task_queue_.size() == 0; });
}
void threadpool::wait_for(clock::duration period)
{
std::unique_lock lock{working_count_mutex_};
working_count_cv_.wait_for(lock, period, [this]{ return working_count_ == 0 && task_queue_.size() == 0; });
}
void threadpool::wait_until(clock::time_point time)
{
std::unique_lock lock{working_count_mutex_};
working_count_cv_.wait_until(lock, time, [this]{ return working_count_ == 0 && task_queue_.size() == 0; });
}
std::size_t threadpool::task_count() const
{
std::unique_lock lock{working_count_mutex_};
return task_queue_.size() + working_count_;
}
}