Huge async rewrite: std-like future, promise & packaged task

This commit is contained in:
Nikita Lisitsa 2021-03-04 17:35:16 +03:00
parent c26cf898ca
commit dd32ab00d3
2 changed files with 203 additions and 53 deletions

View file

@ -81,36 +81,40 @@ namespace psemek::async
auto executor::dispatch(F && f, Args && ... args) auto executor::dispatch(F && f, Args && ... args)
{ {
using R = decltype(f()); using R = decltype(f());
auto state = std::make_shared<detail::task_state<R>>(false); packaged_task<R()> t([f = std::forward<F>(f), ... args = std::forward<Args>(args)]() mutable { return std::forward<F>(f)(std::forward<Args>(args)...); });
post(detail::wrap_task(state, std::forward<F>(f), std::forward<Args>(args)...)); auto fut = t.get_future();
return future<R>(state); post(std::move(t));
return fut;
} }
template <typename F, typename ... Args> template <typename F, typename ... Args>
auto executor::dispatch(auto_cancel_tag, F && f, Args && ... args) auto executor::dispatch(auto_cancel_tag tag, F && f, Args && ... args)
{ {
using R = decltype(f()); using R = decltype(f());
auto state = std::make_shared<detail::task_state<R>>(true); packaged_task<R()> t(tag, [f = std::forward<F>(f), ... args = std::forward<Args>(args)]() mutable { return std::forward<F>(f)(std::forward<Args>(args)...); });
post(detail::wrap_task(state, std::forward<F>(f), std::forward<Args>(args)...)); auto fut = t.get_future();
return future<R>(state); post(std::move(t));
return fut;
} }
template <typename TimePoint, typename F, typename ... Args> template <typename TimePoint, typename F, typename ... Args>
auto executor::dispatch_at(TimePoint time, F && f, Args && ... args) auto executor::dispatch_at(TimePoint time, F && f, Args && ... args)
{ {
using R = decltype(f()); using R = decltype(f());
auto state = std::make_shared<detail::task_state<R>>(false); packaged_task<R()> t([f = std::forward<F>(f), ... args = std::forward<Args>(args)]() mutable { return std::forward<F>(f)(std::forward<Args>(args)...); });
post_at(std::chrono::time_point_cast<clock::duration>(time), detail::wrap_task(state, std::forward<F>(f), std::forward<Args>(args)...)); auto fut = t.get_future();
return future<R>(state); post_at(time, std::move(t));
return fut;
} }
template <typename TimePoint, typename F, typename ... Args> template <typename TimePoint, typename F, typename ... Args>
auto executor::dispatch_at(TimePoint time, auto_cancel_tag, F && f, Args && ... args) auto executor::dispatch_at(TimePoint time, auto_cancel_tag tag, F && f, Args && ... args)
{ {
using R = decltype(f()); using R = decltype(f());
auto state = std::make_shared<detail::task_state<R>>(true); packaged_task<R()> t(tag, [f = std::forward<F>(f), ... args = std::forward<Args>(args)]() mutable { return std::forward<F>(f)(std::forward<Args>(args)...); });
post_at(std::chrono::time_point_cast<clock::duration>(time), detail::wrap_task(state, std::forward<F>(f), std::forward<Args>(args)...)); auto fur = t.get_future();
return future<R>(state); post_at(time, std::move(t));
return fur;
} }
template <typename Iterator> template <typename Iterator>

View file

@ -1,16 +1,30 @@
#pragma once #pragma once
#include <psemek/util/function.hpp>
#include <atomic> #include <atomic>
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <exception> #include <exception>
#include <optional> #include <optional>
#include <vector>
namespace psemek::async namespace psemek::async
{ {
namespace detail namespace detail
{ {
template <typename T>
struct get_return_type
{
using type = T &;
};
template <>
struct get_return_type<void>
{
using type = void;
};
template <typename T> template <typename T>
struct task_value_container struct task_value_container
@ -31,7 +45,7 @@ namespace psemek::async
bool const auto_cancel; bool const auto_cancel;
std::mutex value_mutex; std::mutex value_mutex;
task_value_container<T>::type value; typename task_value_container<T>::type value{};
std::exception_ptr exception; std::exception_ptr exception;
std::condition_variable value_cv; std::condition_variable value_cv;
@ -41,38 +55,6 @@ namespace psemek::async
{} {}
}; };
template <typename T, typename F, typename ... Args>
auto wrap_task(std::shared_ptr<task_state<T>> state, F && f, Args && ... args)
{
return [state, f = std::forward<F>(f), ... args = std::forward<Args>(args)]() mutable {
if (state->canceled) return;
try
{
if constexpr (std::is_same_v<T, void>)
{
std::forward<F>(f)(std::forward<Args>(args)...);
std::lock_guard lock{state->value_mutex};
state->value = true;
state->value_cv.notify_all();
}
else
{
auto value = std::forward<F>(f)(std::forward<Args>(args)...);
std::lock_guard lock{state->value_mutex};
state->value = std::move(value);
state->value_cv.notify_all();
}
}
catch(...)
{
std::lock_guard lock{state->value_mutex};
state->exception = std::current_exception();
state->value_cv.notify_all();
}
};
}
} }
struct empty_future_error struct empty_future_error
@ -124,7 +106,7 @@ namespace psemek::async
bool wait() const bool wait() const
{ {
if (!state_) return false; if (!state_) throw empty_future_error{};
std::unique_lock lock(state_->value_mutex); std::unique_lock lock(state_->value_mutex);
state_->value_cv.wait(lock, [this]{ return has_value_unsafe(); }); state_->value_cv.wait(lock, [this]{ return has_value_unsafe(); });
return true; return true;
@ -133,7 +115,7 @@ namespace psemek::async
template <typename Duration> template <typename Duration>
bool wait_for(Duration period) const bool wait_for(Duration period) const
{ {
if (!state_) return false; if (!state_) throw empty_future_error{};
std::unique_lock lock(state_->value_mutex); std::unique_lock lock(state_->value_mutex);
state_->value_cv.wait_for(lock, period, [this]{ return has_value_unsafe(); }); state_->value_cv.wait_for(lock, period, [this]{ return has_value_unsafe(); });
return has_value_unsafe(); return has_value_unsafe();
@ -142,26 +124,25 @@ namespace psemek::async
template <typename TimePoint> template <typename TimePoint>
bool wait_until(TimePoint time) const bool wait_until(TimePoint time) const
{ {
if (!state_) return false; if (!state_) throw empty_future_error{};
std::unique_lock lock(state_->value_mutex); std::unique_lock lock(state_->value_mutex);
state_->value_cv.wait_until(lock, time, [this]{ return has_value_unsafe(); }); state_->value_cv.wait_until(lock, time, [this]{ return has_value_unsafe(); });
return has_value_unsafe(); return has_value_unsafe();
} }
T get() detail::get_return_type<T>::type get()
{ {
if (!state_) if (!state_)
throw empty_future_error{}; throw empty_future_error{};
if (state_->canceled) if (state_->canceled)
throw canceled_task_error{}; throw canceled_task_error{};
wait(); wait();
std::lock_guard lock{state_->value_mutex};
if (state_->value) if (state_->value)
{ {
if constexpr (std::is_same_v<T, void>) if constexpr (std::is_same_v<T, void>)
return; return;
else else
return std::move(*(state_->value)); return *(state_->value);
} }
else else
std::rethrow_exception(state_->exception); std::rethrow_exception(state_->exception);
@ -187,4 +168,169 @@ namespace psemek::async
} }
}; };
struct empty_promise_error
: std::exception
{
char const * what() const noexcept { return "promise is empty"; }
};
struct satisfied_promise_error
: std::exception
{
char const * what() const noexcept { return "promise already contains a value or exception"; }
};
template <typename T>
struct promise
{
promise(std::nullptr_t)
{}
promise()
: state_{std::make_shared<detail::task_state<T>>(false)}
{}
promise(auto_cancel_tag tag)
: state_{std::make_shared<detail::task_state<T>>(true)}
{}
explicit operator bool() const
{
return static_cast<bool>(state_);
}
void set_value(T const & value)
{
if (!state_) throw empty_promise_error{};
std::lock_guard lock{state_->value_mutex};
if (state_->value || state_->exception) throw satisfied_promise_error{};
state_->value = value;
}
void set_value(T && value)
{
if (!state_) throw empty_promise_error{};
std::lock_guard lock{state_->value_mutex};
if (state_->value || state_->exception) throw satisfied_promise_error{};
state_->value = std::move(value);
}
void set_exception(std::exception_ptr e)
{
if (!state_) throw empty_promise_error{};
std::lock_guard lock{state_->value_mutex};
if (state_->value || state_->exception) throw satisfied_promise_error{};
state_->exception = std::move(e);
}
future<T> get_future() const
{
return future<T>(state_);
}
private:
std::shared_ptr<detail::task_state<T>> state_;
};
template <>
struct promise<void>
{
promise(std::nullptr_t)
{}
promise()
: state_{std::make_shared<detail::task_state<void>>(false)}
{}
promise(auto_cancel_tag)
: state_{std::make_shared<detail::task_state<void>>(true)}
{}
explicit operator bool() const
{
return static_cast<bool>(state_);
}
void set_value()
{
if (!state_) throw empty_promise_error{};
std::lock_guard lock{state_->value_mutex};
if (state_->value || state_->exception) throw satisfied_promise_error{};
state_->value = true;
}
void set_exception(std::exception_ptr e)
{
if (!state_) throw empty_promise_error{};
std::lock_guard lock{state_->value_mutex};
if (state_->value || state_->exception) throw satisfied_promise_error{};
state_->exception = std::move(e);
}
future<void> get_future() const
{
return future<void>(state_);
}
private:
std::shared_ptr<detail::task_state<void>> state_;
};
template <typename Signature>
struct packaged_task;
template <typename R, typename ... Args>
struct packaged_task<R(Args...)>
{
packaged_task()
: promise_(nullptr)
{}
template <typename F>
packaged_task(F && f)
: func_(std::forward<F>(f))
{}
template <typename F>
packaged_task(auto_cancel_tag tag, F && f)
: promise_(tag)
, func_(std::forward<F>(f))
{}
explicit operator bool() const
{
return static_cast<bool>(promise_);
}
future<R> get_future() const
{
return promise_.get_future();
}
template <typename ... Args1>
void operator() (Args1 && ... args)
{
try
{
if constexpr (std::is_same_v<R, void>)
{
func_(std::forward<Args1>(args)...);
promise_.set_value();
}
else
{
promise_.set_value(func_(std::forward<Args1>(args)...));
}
}
catch(...)
{
promise_.set_exception(std::current_exception());
}
}
private:
promise<R> promise_;
util::function<R(Args...)> func_;
};
} }