#include #include #include #include #include #include namespace psemek::async { threadpool::threadpool(std::string const & name, std::size_t thread_count) : working_count_{0} { for (std::size_t th = 0; th < thread_count; ++th) { std::string tname = thread_count == 1 ? name : util::to_string(name, th); threads_.emplace_back([this, tname = std::move(tname)]() mutable { log::thread_registrator reg(std::move(tname)); while (true) { auto task = task_queue_.pop(); if (!task) break; { std::lock_guard lock{working_count_mutex_}; ++working_count_; } try { prof::profiler prof("task"); task(); } catch (util::exception const & e) { log::error() << "Unhandled exception in threadpool executor: " << e; } catch (std::exception const & e) { log::error() << "Unhandled exception in threadpool executor: " << e.what(); } catch (...) { log::error() << "Unhandled unknown exception in threadpool executor"; } { std::lock_guard lock{working_count_mutex_}; --working_count_; } working_count_cv_.notify_all(); } }); } } void threadpool::post(task t) { task_queue_.push(std::move(t)); } void threadpool::post_at(clock::time_point time, task t) { if (time > clock::now()) { task_queue_.push_at(time, std::move(t)); } else post(std::move(t)); } void threadpool::stop() { task_queue_.clear(); for (auto const & thread: threads_) { unused(thread); task_queue_.push(nullptr); } threads_.clear(); } void threadpool::clear() { task_queue_.clear(); } void threadpool::wait() { std::unique_lock lock{working_count_mutex_}; working_count_cv_.wait(lock, [this]{ return working_count_ == 0 && task_queue_.size() == 0; }); } void threadpool::wait_for(clock::duration period) { std::unique_lock lock{working_count_mutex_}; working_count_cv_.wait_for(lock, period, [this]{ return working_count_ == 0 && task_queue_.size() == 0; }); } void threadpool::wait_until(clock::time_point time) { std::unique_lock lock{working_count_mutex_}; working_count_cv_.wait_until(lock, time, [this]{ return working_count_ == 0 && task_queue_.size() == 0; }); } std::size_t threadpool::task_count() const { std::unique_lock lock{working_count_mutex_}; return task_queue_.size() + working_count_; } }