Make async::future shared & support multiple .then() calls

This commit is contained in:
Nikita Lisitsa 2025-09-03 20:20:16 +03:00
parent 78ea54ac2d
commit 8784938e97

View file

@ -55,7 +55,8 @@ namespace psemek::async
std::weak_ptr<cancel_token> weak_cancel; std::weak_ptr<cancel_token> weak_cancel;
std::mutex then_mutex; std::mutex then_mutex;
typename then_function<T>::type then_func; std::vector<typename then_function<T>::type> then_funcs;
std::vector<util::function<void(std::exception_ptr)>> then_exception_funcs;
}; };
} }
@ -85,14 +86,16 @@ namespace psemek::async
using result_type = T; using result_type = T;
future() = default; future() = default;
future(future&&) = default; future(future const &) = default;
future(future &&) = default;
future(std::shared_ptr<detail::task_state<T>> state, std::shared_ptr<detail::cancel_token> cancel) future(std::shared_ptr<detail::task_state<T>> state, std::shared_ptr<detail::cancel_token> cancel)
: state_(std::move(state)) : state_(std::move(state))
, cancel_(std::move(cancel)) , cancel_(std::move(cancel))
{} {}
future & operator = (future&&) = default; future & operator = (future const &) = default;
future & operator = (future &&) = default;
~future() ~future()
{ {
@ -239,8 +242,10 @@ namespace psemek::async
state_->value = value; state_->value = value;
} }
state_->value_cv.notify_all(); state_->value_cv.notify_all();
if (state_->then_func)
state_->then_func(*state_->value); std::lock_guard lock{state_->then_mutex};
for (auto & func : state_->then_funcs)
func(*state_->value);
} }
void set_value(T && value) void set_value(T && value)
@ -252,8 +257,10 @@ namespace psemek::async
state_->value = std::move(value); state_->value = std::move(value);
} }
state_->value_cv.notify_all(); state_->value_cv.notify_all();
if (state_->then_func)
state_->then_func(*state_->value); std::lock_guard lock{state_->then_mutex};
for (auto & func : state_->then_funcs)
func(*state_->value);
} }
void set_exception(std::exception_ptr e) void set_exception(std::exception_ptr e)
@ -263,6 +270,10 @@ namespace psemek::async
std::lock_guard lock{state_->value_mutex}; std::lock_guard lock{state_->value_mutex};
if (state_->value || state_->exception) throw satisfied_promise_error{}; if (state_->value || state_->exception) throw satisfied_promise_error{};
state_->exception = std::move(e); state_->exception = std::move(e);
std::lock_guard lock_then{state_->then_mutex};
for (auto & func : state_->then_exception_funcs)
func(state_->exception);
} }
state_->value_cv.notify_all(); state_->value_cv.notify_all();
} }
@ -324,8 +335,10 @@ namespace psemek::async
state_->value = true; state_->value = true;
} }
state_->value_cv.notify_all(); state_->value_cv.notify_all();
if (state_->then_func)
state_->then_func(); std::lock_guard lock{state_->then_mutex};
for (auto & func : state_->then_funcs)
func();
} }
void set_exception(std::exception_ptr e) void set_exception(std::exception_ptr e)
@ -335,6 +348,10 @@ namespace psemek::async
std::lock_guard lock{state_->value_mutex}; std::lock_guard lock{state_->value_mutex};
if (state_->value || state_->exception) throw satisfied_promise_error{}; if (state_->value || state_->exception) throw satisfied_promise_error{};
state_->exception = std::move(e); state_->exception = std::move(e);
std::lock_guard lock_then{state_->then_mutex};
for (auto & func : state_->then_exception_funcs)
func(state_->exception);
} }
state_->value_cv.notify_all(); state_->value_cv.notify_all();
} }
@ -488,7 +505,7 @@ namespace psemek::async
auto fut = t.get_future(); auto fut = t.get_future();
std::lock_guard lock{state_->then_mutex}; std::lock_guard lock{state_->then_mutex};
state_->then_func = std::move(t); state_->then_funcs.push_back(std::move(t));
return fut; return fut;
} }
else else
@ -498,7 +515,7 @@ namespace psemek::async
auto fut = t.get_future(); auto fut = t.get_future();
std::lock_guard lock{state_->then_mutex}; std::lock_guard lock{state_->then_mutex};
state_->then_func = std::move(t); state_->then_funcs.push_back(std::move(t));
return fut; return fut;
} }
} }