diff --git a/examples/srtm.cpp b/examples/srtm.cpp index 2da413ee..85de728f 100644 --- a/examples/srtm.cpp +++ b/examples/srtm.cpp @@ -270,7 +270,7 @@ struct node_controller std::size_t node_count() const { return node_count_; } - std::size_t loader_queue_size() const { return loader_.queue_size(); } + std::size_t loader_queue_size() const { return loader_.task_count(); } void preload(int max_level); diff --git a/libs/util/include/psemek/util/threadpool.hpp b/libs/util/include/psemek/util/threadpool.hpp index 8d6d9e34..270453a0 100644 --- a/libs/util/include/psemek/util/threadpool.hpp +++ b/libs/util/include/psemek/util/threadpool.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -14,58 +15,35 @@ namespace psemek::util { struct threadpool + : executor { threadpool(std::string const & name) : threadpool(name, std::max(1u, std::thread::hardware_concurrency())) {} - threadpool(std::string const & name, std::size_t thread_count) - : name(name) - , working_count_{0} - { - start(thread_count); - } + threadpool(std::string const & name, std::size_t thread_count); - ~ threadpool() - { - stop(); - } + ~threadpool() override { stop(); } - template - auto dispatch(F && f) - { - using R = decltype(f()); + void post(task t) override; - std::packaged_task task{std::forward(f)}; + void stop() override; - auto result = task.get_future(); + void wait() override; - tasks_queue.push(std::move(task)); + void wait_for(clock::duration period) override; - return result; - } + void wait_until(clock::time_point time) override; - void start(std::size_t thread_count); - - void stop(); - - void wait() - { - tasks_queue.wait(); - - std::unique_lock lock{working_count_mutex_}; - working_count_cv_.wait(lock, [this]{ return working_count_ == 0; }); - } - - std::size_t queue_size() const { return tasks_queue.size(); } + std::size_t task_count() const override; private: - std::string const name; - std::vector threads; - util::synchronized_queue> tasks_queue; + std::vector threads_; + util::synchronized_queue task_queue_; + std::atomic running_; std::size_t working_count_; - std::mutex working_count_mutex_; + mutable std::mutex working_count_mutex_; std::condition_variable working_count_cv_; }; diff --git a/libs/util/source/threadpool.cpp b/libs/util/source/threadpool.cpp index cee1a6e0..da49cb67 100644 --- a/libs/util/source/threadpool.cpp +++ b/libs/util/source/threadpool.cpp @@ -1,31 +1,55 @@ #include #include #include +#include #include namespace psemek::util { - void threadpool::start(std::size_t thread_count) + namespace + { + + struct stop_execution{}; + + } + + threadpool::threadpool(std::string const & name, std::size_t thread_count) + : running_{true} + , working_count_{0} { for (std::size_t th = 0; th < thread_count; ++th) { - threads.emplace_back([this, th] + threads_.emplace_back([this, &name, th, thread_count] { - log::register_thread(to_string(name, '#', th)); - while (true) + log::register_thread(thread_count == 1 ? name : to_string(name, '#', th)); + for (bool running = true; running;) { - auto task = tasks_queue.pop(); - - if (!task) - break; + auto task = task_queue_.pop(); { std::lock_guard lock{working_count_mutex_}; ++working_count_; } - task(); + + try + { + task(); + } + catch (stop_execution const &) + { + running = false; + } + catch (std::exception const & e) + { + log::error() << "Unhandled exception in threadpool executor: " << e.what(); + } + catch (...) + { + log::error() << "Unhandled unknown exception in threadpool executor"; + } + { std::lock_guard lock{working_count_mutex_}; --working_count_; @@ -36,15 +60,44 @@ namespace psemek::util } } + void threadpool::post(task t) + { + task_queue_.push(std::move(t)); + } + void threadpool::stop() { - tasks_queue.clear(); - for (auto const & thread: threads) + task_queue_.clear(); + for (auto const & thread: threads_) { unused(thread); - tasks_queue.push({}); + task_queue_.push([]{ throw stop_execution{}; }); } - threads.clear(); + threads_.clear(); + } + + void threadpool::wait() + { + std::unique_lock lock{working_count_mutex_}; + working_count_cv_.wait(lock, [this]{ return working_count_ == 0 && task_queue_.size() == 0; }); + } + + void threadpool::wait_for(clock::duration period) + { + std::unique_lock lock{working_count_mutex_}; + working_count_cv_.wait_for(lock, period, [this]{ return working_count_ == 0 && task_queue_.size() == 0; }); + } + + void threadpool::wait_until(clock::time_point time) + { + std::unique_lock lock{working_count_mutex_}; + working_count_cv_.wait_until(lock, time, [this]{ return working_count_ == 0 && task_queue_.size() == 0; }); + } + + std::size_t threadpool::task_count() const + { + std::unique_lock lock{working_count_mutex_}; + return task_queue_.size() + working_count_; } }