From 8bfc6cfd728d536de05be7c1f5892ffe35b2c57d Mon Sep 17 00:00:00 2001 From: lisyarus Date: Thu, 19 Nov 2020 23:48:33 +0300 Subject: [PATCH] Add an executor interface that supports cancelable tasks --- libs/util/include/psemek/util/executor.hpp | 148 +++++++++++++++++++++ 1 file changed, 148 insertions(+) create mode 100644 libs/util/include/psemek/util/executor.hpp diff --git a/libs/util/include/psemek/util/executor.hpp b/libs/util/include/psemek/util/executor.hpp new file mode 100644 index 00000000..ff17c014 --- /dev/null +++ b/libs/util/include/psemek/util/executor.hpp @@ -0,0 +1,148 @@ +#pragma once + +#include + +#include +#include +#include + +namespace psemek::util +{ + + namespace detail + { + + template + struct task_state + { + std::promise promise; + std::atomic canceled = false; + }; + + } + + struct canceled_task_error + : std::exception + { + char const * what() const noexcept { return "task canceled"; } + }; + + template + struct future + { + future(std::shared_ptr> state) + : state_(std::move(state)) + , f_(state_->promise.get_future()) + {} + + bool wait() + { + f_.wait(); + return true; + } + + template + bool wait_for(Duration period) + { + return f_.wait_for(period) == std::future_status::ready; + } + + template + bool wait_until(TimePoint time) + { + return f_.wait_until(time) == std::future_status::ready; + } + + T get() + { + if (state_->canceled) + throw canceled_task_error{}; + return f_.get(); + } + + void cancel() + { + state_->canceled = true; + } + + private: + std::shared_ptr> state_; + std::future f_; + }; + + struct executor + { + using task = movable_function; + using clock = std::chrono::high_resolution_clock; + + // Post the task for execution. Where and when will + // it be executed is up to a concrete executor, as + // well as exception handling policy. + // NB: it is better to use the "dispatch" method. + virtual void post(task t) = 0; + + // Stop the executor. No tasks will be executed + // after this function returns. + // NB: the executor must call stop() from destructor. + virtual void stop() = 0; + + // Wait for all the tasks to be executed. + // May take forever. + virtual void wait() = 0; + + // Wait for all the tasks to be executed, + // but no longer than period. + virtual void wait_for(clock::duration period) = 0; + + // Wait for all the tasks to be executed, + // but no longer than time. + virtual void wait_until(clock::time_point time) = 0; + + // The number of tasks that were posted but havent + // finished yet. + // NB: since the executor may run in another thread, + // the return value of this function might change + // between calls. + virtual std::size_t task_count() const = 0; + + // Post a callable for execution. Retuns a future. + template + auto dispatch(F && f, Args && ... args); + + virtual ~executor() {} + }; + + template + auto executor::dispatch(F && f, Args && ... args) + { + using R = decltype(f()); + + auto state = std::make_shared>(); + + auto func = [state, f = std::forward(f), ... args = std::forward(args)]() mutable { + if (state->canceled) return; + + try + { + if constexpr (std::is_same_v) + { + std::forward(f)(std::forward(args)...); + state->promise.set_value(); + } + else + { + state->promise.set_value(std::forward(f)(std::forward(args)...)); + } + } + catch(...) + { + state->promise.set_exception(std::current_exception()); + } + }; + + post(func); + + return future(state); + } + +}