From 240d26663b8feb9bf28128e1069b50853f8d747f Mon Sep 17 00:00:00 2001 From: lisyarus Date: Sat, 21 Nov 2020 15:16:47 +0300 Subject: [PATCH] Add deferred event support to executors --- libs/util/include/psemek/util/event_loop.hpp | 18 +++++- libs/util/include/psemek/util/executor.hpp | 67 +++++++++++++------- libs/util/include/psemek/util/threadpool.hpp | 3 +- libs/util/source/event_loop.cpp | 59 +++++++++++++++-- libs/util/source/threadpool.cpp | 13 +++- 5 files changed, 126 insertions(+), 34 deletions(-) diff --git a/libs/util/include/psemek/util/event_loop.hpp b/libs/util/include/psemek/util/event_loop.hpp index c5ea7048..ff67541c 100644 --- a/libs/util/include/psemek/util/event_loop.hpp +++ b/libs/util/include/psemek/util/event_loop.hpp @@ -3,6 +3,7 @@ #include #include +#include namespace psemek::util { @@ -12,7 +13,9 @@ namespace psemek::util { void post(task t) override; - void stop() override {} + void post_at(clock::time_point time, task t) override; + + void stop() override; void wait() override; @@ -20,12 +23,23 @@ namespace psemek::util void wait_until(clock::time_point time) override; - std::size_t task_count() const override { return task_queue_.size(); } + std::size_t task_count() const override; ~event_loop() override { stop(); } private: + struct deferred_task + { + clock::time_point time; + task func; + }; + std::deque task_queue_; + std::vector deferred_task_heap_; + + static auto heap_compare(); + + void flush_deferred(); }; diff --git a/libs/util/include/psemek/util/executor.hpp b/libs/util/include/psemek/util/executor.hpp index ff17c014..729d65ab 100644 --- a/libs/util/include/psemek/util/executor.hpp +++ b/libs/util/include/psemek/util/executor.hpp @@ -19,6 +19,31 @@ namespace psemek::util std::atomic canceled = false; }; + template + auto wrap_task(std::shared_ptr> state, F && f, Args && ... args) + { + return [state, f = std::forward(f), ... args = std::forward(args)]() mutable { + if (state->canceled) return; + + try + { + if constexpr (std::is_same_v) + { + std::forward(f)(std::forward(args)...); + state->promise.set_value(); + } + else + { + state->promise.set_value(std::forward(f)(std::forward(args)...)); + } + } + catch(...) + { + state->promise.set_exception(std::current_exception()); + } + }; + } + } struct canceled_task_error @@ -81,6 +106,10 @@ namespace psemek::util // NB: it is better to use the "dispatch" method. virtual void post(task t) = 0; + // Post the task for execution at a certain time point. + // NB: it is better to use the "dispatch_at" method. + virtual void post_at(clock::time_point time, task t) = 0; + // Stop the executor. No tasks will be executed // after this function returns. // NB: the executor must call stop() from destructor. @@ -109,6 +138,11 @@ namespace psemek::util template auto dispatch(F && f, Args && ... args); + // Post a callable for execution at a certain time point. + // Retuns a future. + template + auto dispatch_at(TimePoint time, F && f, Args && ... args); + virtual ~executor() {} }; @@ -116,32 +150,17 @@ namespace psemek::util auto executor::dispatch(F && f, Args && ... args) { using R = decltype(f()); - auto state = std::make_shared>(); + post(detail::wrap_task(state, std::forward(f), std::forward(args)...)); + return future(state); + } - auto func = [state, f = std::forward(f), ... args = std::forward(args)]() mutable { - if (state->canceled) return; - - try - { - if constexpr (std::is_same_v) - { - std::forward(f)(std::forward(args)...); - state->promise.set_value(); - } - else - { - state->promise.set_value(std::forward(f)(std::forward(args)...)); - } - } - catch(...) - { - state->promise.set_exception(std::current_exception()); - } - }; - - post(func); - + template + auto executor::dispatch_at(TimePoint time, F && f, Args && ... args) + { + using R = decltype(f()); + auto state = std::make_shared>(); + post_at(std::chrono::time_point_cast(time), detail::wrap_task(state, std::forward(f), std::forward(args)...)); return future(state); } diff --git a/libs/util/include/psemek/util/threadpool.hpp b/libs/util/include/psemek/util/threadpool.hpp index 270453a0..ced6fc55 100644 --- a/libs/util/include/psemek/util/threadpool.hpp +++ b/libs/util/include/psemek/util/threadpool.hpp @@ -27,6 +27,8 @@ namespace psemek::util void post(task t) override; + void post_at(clock::time_point time, task t) override; + void stop() override; void wait() override; @@ -40,7 +42,6 @@ namespace psemek::util private: std::vector threads_; util::synchronized_queue task_queue_; - std::atomic running_; std::size_t working_count_; mutable std::mutex working_count_mutex_; diff --git a/libs/util/source/event_loop.cpp b/libs/util/source/event_loop.cpp index 2064d08a..59462247 100644 --- a/libs/util/source/event_loop.cpp +++ b/libs/util/source/event_loop.cpp @@ -1,20 +1,52 @@ #include +#include + namespace psemek::util { + auto event_loop::heap_compare() + { + return [](deferred_task const & t1, deferred_task const & t2){ return t1.time > t2.time; }; + } + void event_loop::post(task t) { task_queue_.push_back(std::move(t)); } + void event_loop::post_at(clock::time_point time, task t) + { + if (time > clock::now()) + { + deferred_task_heap_.push_back(deferred_task{time, std::move(t)}); + std::push_heap(deferred_task_heap_.begin(), deferred_task_heap_.end(), heap_compare()); + } + else + task_queue_.push_back(std::move(t)); + } + + void event_loop::stop() + { + task_queue_.clear(); + deferred_task_heap_.clear(); + } + void event_loop::wait() { - while (!task_queue_.empty()) + while (!task_queue_.empty() || !deferred_task_heap_.empty()) { - auto t = std::move(task_queue_.front()); - task_queue_.pop_front(); - t(); + flush_deferred(); + if (task_queue_.empty()) + { + std::this_thread::sleep_until(deferred_task_heap_.front().time); + } + else + { + auto t = std::move(task_queue_.front()); + task_queue_.pop_front(); + t(); + } } } @@ -25,12 +57,29 @@ namespace psemek::util void event_loop::wait_until(clock::time_point time) { - while (!task_queue_.empty() && clock::now() < time) + while ((!task_queue_.empty() || !deferred_task_heap_.empty()) && clock::now() < time) { + flush_deferred(); auto t = std::move(task_queue_.front()); task_queue_.pop_front(); t(); } } + std::size_t event_loop::task_count() const + { + return task_queue_.size() + deferred_task_heap_.size(); + } + + void event_loop::flush_deferred() + { + auto const now = clock::now(); + while (!deferred_task_heap_.empty() && deferred_task_heap_.front().time <= now) + { + task_queue_.push_back(std::move(deferred_task_heap_.front().func)); + std::pop_heap(deferred_task_heap_.begin(), deferred_task_heap_.end(), heap_compare()); + deferred_task_heap_.pop_back(); + } + } + } diff --git a/libs/util/source/threadpool.cpp b/libs/util/source/threadpool.cpp index da49cb67..e0fa4e47 100644 --- a/libs/util/source/threadpool.cpp +++ b/libs/util/source/threadpool.cpp @@ -16,8 +16,7 @@ namespace psemek::util } threadpool::threadpool(std::string const & name, std::size_t thread_count) - : running_{true} - , working_count_{0} + : working_count_{0} { for (std::size_t th = 0; th < thread_count; ++th) { @@ -65,6 +64,16 @@ namespace psemek::util task_queue_.push(std::move(t)); } + void threadpool::post_at(clock::time_point time, task t) + { + if (time > clock::now()) + { + task_queue_.push_at(time, std::move(t)); + } + else + post(std::move(t)); + } + void threadpool::stop() { task_queue_.clear();