Reimplement async::future without std::future

This commit is contained in:
Nikita Lisitsa 2021-03-04 11:02:11 +03:00
parent fccf2e382e
commit ef34071748
2 changed files with 191 additions and 127 deletions

View file

@ -1,140 +1,14 @@
#pragma once #pragma once
#include <psemek/async/future.hpp>
#include <psemek/util/function.hpp> #include <psemek/util/function.hpp>
#include <future>
#include <chrono> #include <chrono>
#include <memory> #include <memory>
namespace psemek::async namespace psemek::async
{ {
namespace detail
{
template <typename T>
struct task_state
{
std::promise<T> promise;
std::atomic<bool> canceled = false;
bool const auto_cancel;
task_state(bool auto_cancel)
: auto_cancel{auto_cancel}
{}
};
template <typename T, typename F, typename ... Args>
auto wrap_task(std::shared_ptr<task_state<T>> state, F && f, Args && ... args)
{
return [state, f = std::forward<F>(f), ... args = std::forward<Args>(args)]() mutable {
if (state->canceled) return;
try
{
if constexpr (std::is_same_v<T, void>)
{
std::forward<F>(f)(std::forward<Args>(args)...);
state->promise.set_value();
}
else
{
state->promise.set_value(std::forward<F>(f)(std::forward<Args>(args)...));
}
}
catch(...)
{
state->promise.set_exception(std::current_exception());
}
};
}
}
struct canceled_task_error
: std::exception
{
char const * what() const noexcept { return "task canceled"; }
};
struct auto_cancel_tag{};
constexpr auto_cancel_tag auto_cancel;
template <typename T>
struct future
{
using result_type = T;
future() = default;
future(future&&) = default;
future(std::shared_ptr<detail::task_state<T>> state)
: state_(std::move(state))
, f_(state_->promise.get_future())
{}
future & operator = (future&&) = default;
~future()
{
reset();
}
void reset()
{
if (state_ && state_->auto_cancel)
cancel();
state_.reset();
f_ = std::future<T>();
}
explicit operator bool() const
{
return static_cast<bool>(state_);
}
bool wait() const
{
f_.wait();
return true;
}
template <typename Duration>
bool wait_for(Duration period) const
{
return f_.wait_for(period) == std::future_status::ready;
}
template <typename TimePoint>
bool wait_until(TimePoint time) const
{
return f_.wait_until(time) == std::future_status::ready;
}
T get()
{
if (state_ && state_->canceled)
throw canceled_task_error{};
return f_.get();
}
void cancel()
{
if (state_)
state_->canceled = true;
}
bool ready() const
{
return wait_for(std::chrono::seconds{0});
}
private:
std::shared_ptr<detail::task_state<T>> state_;
std::future<T> f_;
};
using clock = std::chrono::high_resolution_clock; using clock = std::chrono::high_resolution_clock;
struct executor struct executor

View file

@ -0,0 +1,190 @@
#pragma once
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <exception>
#include <optional>
namespace psemek::async
{
namespace detail
{
template <typename T>
struct task_value_container
{
using type = std::optional<T>;
};
template <>
struct task_value_container<void>
{
using type = bool;
};
template <typename T>
struct task_state
{
std::atomic<bool> canceled = false;
bool const auto_cancel;
std::mutex value_mutex;
task_value_container<T>::type value;
std::exception_ptr exception;
std::condition_variable value_cv;
task_state(bool auto_cancel)
: auto_cancel{auto_cancel}
{}
};
template <typename T, typename F, typename ... Args>
auto wrap_task(std::shared_ptr<task_state<T>> state, F && f, Args && ... args)
{
return [state, f = std::forward<F>(f), ... args = std::forward<Args>(args)]() mutable {
if (state->canceled) return;
try
{
if constexpr (std::is_same_v<T, void>)
{
std::forward<F>(f)(std::forward<Args>(args)...);
std::lock_guard lock{state->value_mutex};
state->value = true;
state->value_cv.notify_all();
}
else
{
auto value = std::forward<F>(f)(std::forward<Args>(args)...);
std::lock_guard lock{state->value_mutex};
state->value = std::move(value);
state->value_cv.notify_all();
}
}
catch(...)
{
std::lock_guard lock{state->value_mutex};
state->exception = std::current_exception();
state->value_cv.notify_all();
}
};
}
}
struct empty_future_error
: std::exception
{
char const * what() const noexcept { return "future is empty"; }
};
struct canceled_task_error
: std::exception
{
char const * what() const noexcept { return "task canceled"; }
};
struct auto_cancel_tag{};
constexpr auto_cancel_tag auto_cancel;
template <typename T>
struct future
{
using result_type = T;
future() = default;
future(future&&) = default;
future(std::shared_ptr<detail::task_state<T>> state)
: state_(std::move(state))
{}
future & operator = (future&&) = default;
~future()
{
reset();
}
void reset()
{
if (state_ && state_->auto_cancel)
cancel();
state_.reset();
}
explicit operator bool() const
{
return static_cast<bool>(state_);
}
bool wait() const
{
if (!state_) return false;
std::unique_lock lock(state_->value_mutex);
state_->value_cv.wait(lock, [this]{ return has_value_unsafe(); });
return true;
}
template <typename Duration>
bool wait_for(Duration period) const
{
if (!state_) return false;
std::unique_lock lock(state_->value_mutex);
state_->value_cv.wait_for(lock, period, [this]{ return has_value_unsafe(); });
return has_value_unsafe();
}
template <typename TimePoint>
bool wait_until(TimePoint time) const
{
if (!state_) return false;
std::unique_lock lock(state_->value_mutex);
state_->value_cv.wait_until(lock, time, [this]{ return has_value_unsafe(); });
return has_value_unsafe();
}
T get()
{
if (!state_)
throw empty_future_error{};
if (state_->canceled)
throw canceled_task_error{};
wait();
std::lock_guard lock{state_->value_mutex};
if (state_->value)
{
if constexpr (std::is_same_v<T, void>)
return;
else
return std::move(*(state_->value));
}
else
std::rethrow_exception(state_->exception);
}
void cancel()
{
if (state_)
state_->canceled = true;
}
bool ready() const
{
return wait_for(std::chrono::seconds{0});
}
private:
std::shared_ptr<detail::task_state<T>> state_;
bool has_value_unsafe() const
{
return state_->value || state_->exception;
}
};
}