Add an executor interface that supports cancelable tasks

This commit is contained in:
Nikita Lisitsa 2020-11-19 23:48:33 +03:00
parent 0a1faf14c7
commit 8bfc6cfd72

View file

@ -0,0 +1,148 @@
#pragma once
#include <psemek/util/movable_function.hpp>
#include <future>
#include <chrono>
#include <memory>
namespace psemek::util
{
namespace detail
{
template <typename T>
struct task_state
{
std::promise<T> promise;
std::atomic<bool> canceled = false;
};
}
struct canceled_task_error
: std::exception
{
char const * what() const noexcept { return "task canceled"; }
};
template <typename T>
struct future
{
future(std::shared_ptr<detail::task_state<T>> state)
: state_(std::move(state))
, f_(state_->promise.get_future())
{}
bool wait()
{
f_.wait();
return true;
}
template <typename Duration>
bool wait_for(Duration period)
{
return f_.wait_for(period) == std::future_status::ready;
}
template <typename TimePoint>
bool wait_until(TimePoint time)
{
return f_.wait_until(time) == std::future_status::ready;
}
T get()
{
if (state_->canceled)
throw canceled_task_error{};
return f_.get();
}
void cancel()
{
state_->canceled = true;
}
private:
std::shared_ptr<detail::task_state<T>> state_;
std::future<T> f_;
};
struct executor
{
using task = movable_function<void()>;
using clock = std::chrono::high_resolution_clock;
// Post the task for execution. Where and when will
// it be executed is up to a concrete executor, as
// well as exception handling policy.
// NB: it is better to use the "dispatch" method.
virtual void post(task t) = 0;
// Stop the executor. No tasks will be executed
// after this function returns.
// NB: the executor must call stop() from destructor.
virtual void stop() = 0;
// Wait for all the tasks to be executed.
// May take forever.
virtual void wait() = 0;
// Wait for all the tasks to be executed,
// but no longer than period.
virtual void wait_for(clock::duration period) = 0;
// Wait for all the tasks to be executed,
// but no longer than time.
virtual void wait_until(clock::time_point time) = 0;
// The number of tasks that were posted but havent
// finished yet.
// NB: since the executor may run in another thread,
// the return value of this function might change
// between calls.
virtual std::size_t task_count() const = 0;
// Post a callable for execution. Retuns a future.
template <typename F, typename ... Args>
auto dispatch(F && f, Args && ... args);
virtual ~executor() {}
};
template <typename F, typename ... Args>
auto executor::dispatch(F && f, Args && ... args)
{
using R = decltype(f());
auto state = std::make_shared<detail::task_state<R>>();
auto func = [state, f = std::forward<F>(f), ... args = std::forward<Args>(args)]() mutable {
if (state->canceled) return;
try
{
if constexpr (std::is_same_v<R, void>)
{
std::forward<F>(f)(std::forward<Args>(args)...);
state->promise.set_value();
}
else
{
state->promise.set_value(std::forward<F>(f)(std::forward<Args>(args)...));
}
}
catch(...)
{
state->promise.set_exception(std::current_exception());
}
};
post(func);
return future<R>(state);
}
}