diff --git a/libs/async/include/psemek/async/executor.hpp b/libs/async/include/psemek/async/executor.hpp index 79600206..8a940c90 100644 --- a/libs/async/include/psemek/async/executor.hpp +++ b/libs/async/include/psemek/async/executor.hpp @@ -1,140 +1,14 @@ #pragma once +#include #include -#include #include #include namespace psemek::async { - namespace detail - { - - template - struct task_state - { - std::promise promise; - std::atomic canceled = false; - bool const auto_cancel; - - task_state(bool auto_cancel) - : auto_cancel{auto_cancel} - {} - }; - - template - auto wrap_task(std::shared_ptr> state, F && f, Args && ... args) - { - return [state, f = std::forward(f), ... args = std::forward(args)]() mutable { - if (state->canceled) return; - - try - { - if constexpr (std::is_same_v) - { - std::forward(f)(std::forward(args)...); - state->promise.set_value(); - } - else - { - state->promise.set_value(std::forward(f)(std::forward(args)...)); - } - } - catch(...) - { - state->promise.set_exception(std::current_exception()); - } - }; - } - - } - - struct canceled_task_error - : std::exception - { - char const * what() const noexcept { return "task canceled"; } - }; - - struct auto_cancel_tag{}; - constexpr auto_cancel_tag auto_cancel; - - template - struct future - { - using result_type = T; - - future() = default; - future(future&&) = default; - - future(std::shared_ptr> state) - : state_(std::move(state)) - , f_(state_->promise.get_future()) - {} - - future & operator = (future&&) = default; - - ~future() - { - reset(); - } - - void reset() - { - if (state_ && state_->auto_cancel) - cancel(); - - state_.reset(); - f_ = std::future(); - } - - explicit operator bool() const - { - return static_cast(state_); - } - - bool wait() const - { - f_.wait(); - return true; - } - - template - bool wait_for(Duration period) const - { - return f_.wait_for(period) == std::future_status::ready; - } - - template - bool wait_until(TimePoint time) const - { - return f_.wait_until(time) == std::future_status::ready; - } - - T get() - { - if (state_ && state_->canceled) - throw canceled_task_error{}; - return f_.get(); - } - - void cancel() - { - if (state_) - state_->canceled = true; - } - - bool ready() const - { - return wait_for(std::chrono::seconds{0}); - } - - private: - std::shared_ptr> state_; - std::future f_; - }; - using clock = std::chrono::high_resolution_clock; struct executor diff --git a/libs/async/include/psemek/async/future.hpp b/libs/async/include/psemek/async/future.hpp new file mode 100644 index 00000000..2590f981 --- /dev/null +++ b/libs/async/include/psemek/async/future.hpp @@ -0,0 +1,190 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace psemek::async +{ + + namespace detail + { + + template + struct task_value_container + { + using type = std::optional; + }; + + template <> + struct task_value_container + { + using type = bool; + }; + + template + struct task_state + { + std::atomic canceled = false; + bool const auto_cancel; + + std::mutex value_mutex; + task_value_container::type value; + std::exception_ptr exception; + + std::condition_variable value_cv; + + task_state(bool auto_cancel) + : auto_cancel{auto_cancel} + {} + }; + + template + auto wrap_task(std::shared_ptr> state, F && f, Args && ... args) + { + return [state, f = std::forward(f), ... args = std::forward(args)]() mutable { + if (state->canceled) return; + + try + { + if constexpr (std::is_same_v) + { + std::forward(f)(std::forward(args)...); + std::lock_guard lock{state->value_mutex}; + state->value = true; + state->value_cv.notify_all(); + } + else + { + auto value = std::forward(f)(std::forward(args)...); + std::lock_guard lock{state->value_mutex}; + state->value = std::move(value); + state->value_cv.notify_all(); + } + } + catch(...) + { + std::lock_guard lock{state->value_mutex}; + state->exception = std::current_exception(); + state->value_cv.notify_all(); + } + }; + } + + } + + struct empty_future_error + : std::exception + { + char const * what() const noexcept { return "future is empty"; } + }; + + struct canceled_task_error + : std::exception + { + char const * what() const noexcept { return "task canceled"; } + }; + + struct auto_cancel_tag{}; + constexpr auto_cancel_tag auto_cancel; + + template + struct future + { + using result_type = T; + + future() = default; + future(future&&) = default; + + future(std::shared_ptr> state) + : state_(std::move(state)) + {} + + future & operator = (future&&) = default; + + ~future() + { + reset(); + } + + void reset() + { + if (state_ && state_->auto_cancel) + cancel(); + + state_.reset(); + } + + explicit operator bool() const + { + return static_cast(state_); + } + + bool wait() const + { + if (!state_) return false; + std::unique_lock lock(state_->value_mutex); + state_->value_cv.wait(lock, [this]{ return has_value_unsafe(); }); + return true; + } + + template + bool wait_for(Duration period) const + { + if (!state_) return false; + std::unique_lock lock(state_->value_mutex); + state_->value_cv.wait_for(lock, period, [this]{ return has_value_unsafe(); }); + return has_value_unsafe(); + } + + template + bool wait_until(TimePoint time) const + { + if (!state_) return false; + std::unique_lock lock(state_->value_mutex); + state_->value_cv.wait_until(lock, time, [this]{ return has_value_unsafe(); }); + return has_value_unsafe(); + } + + T get() + { + if (!state_) + throw empty_future_error{}; + if (state_->canceled) + throw canceled_task_error{}; + wait(); + std::lock_guard lock{state_->value_mutex}; + if (state_->value) + { + if constexpr (std::is_same_v) + return; + else + return std::move(*(state_->value)); + } + else + std::rethrow_exception(state_->exception); + } + + void cancel() + { + if (state_) + state_->canceled = true; + } + + bool ready() const + { + return wait_for(std::chrono::seconds{0}); + } + + private: + std::shared_ptr> state_; + + bool has_value_unsafe() const + { + return state_->value || state_->exception; + } + }; + +}