Add deferred event support to executors

This commit is contained in:
Nikita Lisitsa 2020-11-21 15:16:47 +03:00
parent a8a1f44a89
commit 240d26663b
5 changed files with 126 additions and 34 deletions

View file

@ -3,6 +3,7 @@
#include <psemek/util/executor.hpp> #include <psemek/util/executor.hpp>
#include <deque> #include <deque>
#include <vector>
namespace psemek::util namespace psemek::util
{ {
@ -12,7 +13,9 @@ namespace psemek::util
{ {
void post(task t) override; void post(task t) override;
void stop() override {} void post_at(clock::time_point time, task t) override;
void stop() override;
void wait() override; void wait() override;
@ -20,12 +23,23 @@ namespace psemek::util
void wait_until(clock::time_point time) override; void wait_until(clock::time_point time) override;
std::size_t task_count() const override { return task_queue_.size(); } std::size_t task_count() const override;
~event_loop() override { stop(); } ~event_loop() override { stop(); }
private: private:
struct deferred_task
{
clock::time_point time;
task func;
};
std::deque<task> task_queue_; std::deque<task> task_queue_;
std::vector<deferred_task> deferred_task_heap_;
static auto heap_compare();
void flush_deferred();
}; };

View file

@ -19,6 +19,31 @@ namespace psemek::util
std::atomic<bool> canceled = false; std::atomic<bool> canceled = false;
}; };
template <typename T, typename F, typename ... Args>
auto wrap_task(std::shared_ptr<task_state<T>> state, F && f, Args && ... args)
{
return [state, f = std::forward<F>(f), ... args = std::forward<Args>(args)]() mutable {
if (state->canceled) return;
try
{
if constexpr (std::is_same_v<T, void>)
{
std::forward<F>(f)(std::forward<Args>(args)...);
state->promise.set_value();
}
else
{
state->promise.set_value(std::forward<F>(f)(std::forward<Args>(args)...));
}
}
catch(...)
{
state->promise.set_exception(std::current_exception());
}
};
}
} }
struct canceled_task_error struct canceled_task_error
@ -81,6 +106,10 @@ namespace psemek::util
// NB: it is better to use the "dispatch" method. // NB: it is better to use the "dispatch" method.
virtual void post(task t) = 0; virtual void post(task t) = 0;
// Post the task for execution at a certain time point.
// NB: it is better to use the "dispatch_at" method.
virtual void post_at(clock::time_point time, task t) = 0;
// Stop the executor. No tasks will be executed // Stop the executor. No tasks will be executed
// after this function returns. // after this function returns.
// NB: the executor must call stop() from destructor. // NB: the executor must call stop() from destructor.
@ -109,6 +138,11 @@ namespace psemek::util
template <typename F, typename ... Args> template <typename F, typename ... Args>
auto dispatch(F && f, Args && ... args); auto dispatch(F && f, Args && ... args);
// Post a callable for execution at a certain time point.
// Retuns a future.
template <typename TimePoint, typename F, typename ... Args>
auto dispatch_at(TimePoint time, F && f, Args && ... args);
virtual ~executor() {} virtual ~executor() {}
}; };
@ -116,32 +150,17 @@ namespace psemek::util
auto executor::dispatch(F && f, Args && ... args) auto executor::dispatch(F && f, Args && ... args)
{ {
using R = decltype(f()); using R = decltype(f());
auto state = std::make_shared<detail::task_state<R>>(); auto state = std::make_shared<detail::task_state<R>>();
post(detail::wrap_task(state, std::forward<F>(f), std::forward<Args>(args)...));
return future<R>(state);
}
auto func = [state, f = std::forward<F>(f), ... args = std::forward<Args>(args)]() mutable { template <typename TimePoint, typename F, typename ... Args>
if (state->canceled) return; auto executor::dispatch_at(TimePoint time, F && f, Args && ... args)
{
try using R = decltype(f());
{ auto state = std::make_shared<detail::task_state<R>>();
if constexpr (std::is_same_v<R, void>) post_at(std::chrono::time_point_cast<clock::time_point>(time), detail::wrap_task(state, std::forward<F>(f), std::forward<Args>(args)...));
{
std::forward<F>(f)(std::forward<Args>(args)...);
state->promise.set_value();
}
else
{
state->promise.set_value(std::forward<F>(f)(std::forward<Args>(args)...));
}
}
catch(...)
{
state->promise.set_exception(std::current_exception());
}
};
post(func);
return future<R>(state); return future<R>(state);
} }

View file

@ -27,6 +27,8 @@ namespace psemek::util
void post(task t) override; void post(task t) override;
void post_at(clock::time_point time, task t) override;
void stop() override; void stop() override;
void wait() override; void wait() override;
@ -40,7 +42,6 @@ namespace psemek::util
private: private:
std::vector<util::thread> threads_; std::vector<util::thread> threads_;
util::synchronized_queue<task> task_queue_; util::synchronized_queue<task> task_queue_;
std::atomic<bool> running_;
std::size_t working_count_; std::size_t working_count_;
mutable std::mutex working_count_mutex_; mutable std::mutex working_count_mutex_;

View file

@ -1,20 +1,52 @@
#include <psemek/util/event_loop.hpp> #include <psemek/util/event_loop.hpp>
#include <algorithm>
namespace psemek::util namespace psemek::util
{ {
auto event_loop::heap_compare()
{
return [](deferred_task const & t1, deferred_task const & t2){ return t1.time > t2.time; };
}
void event_loop::post(task t) void event_loop::post(task t)
{ {
task_queue_.push_back(std::move(t)); task_queue_.push_back(std::move(t));
} }
void event_loop::post_at(clock::time_point time, task t)
{
if (time > clock::now())
{
deferred_task_heap_.push_back(deferred_task{time, std::move(t)});
std::push_heap(deferred_task_heap_.begin(), deferred_task_heap_.end(), heap_compare());
}
else
task_queue_.push_back(std::move(t));
}
void event_loop::stop()
{
task_queue_.clear();
deferred_task_heap_.clear();
}
void event_loop::wait() void event_loop::wait()
{ {
while (!task_queue_.empty()) while (!task_queue_.empty() || !deferred_task_heap_.empty())
{ {
auto t = std::move(task_queue_.front()); flush_deferred();
task_queue_.pop_front(); if (task_queue_.empty())
t(); {
std::this_thread::sleep_until(deferred_task_heap_.front().time);
}
else
{
auto t = std::move(task_queue_.front());
task_queue_.pop_front();
t();
}
} }
} }
@ -25,12 +57,29 @@ namespace psemek::util
void event_loop::wait_until(clock::time_point time) void event_loop::wait_until(clock::time_point time)
{ {
while (!task_queue_.empty() && clock::now() < time) while ((!task_queue_.empty() || !deferred_task_heap_.empty()) && clock::now() < time)
{ {
flush_deferred();
auto t = std::move(task_queue_.front()); auto t = std::move(task_queue_.front());
task_queue_.pop_front(); task_queue_.pop_front();
t(); t();
} }
} }
std::size_t event_loop::task_count() const
{
return task_queue_.size() + deferred_task_heap_.size();
}
void event_loop::flush_deferred()
{
auto const now = clock::now();
while (!deferred_task_heap_.empty() && deferred_task_heap_.front().time <= now)
{
task_queue_.push_back(std::move(deferred_task_heap_.front().func));
std::pop_heap(deferred_task_heap_.begin(), deferred_task_heap_.end(), heap_compare());
deferred_task_heap_.pop_back();
}
}
} }

View file

@ -16,8 +16,7 @@ namespace psemek::util
} }
threadpool::threadpool(std::string const & name, std::size_t thread_count) threadpool::threadpool(std::string const & name, std::size_t thread_count)
: running_{true} : working_count_{0}
, working_count_{0}
{ {
for (std::size_t th = 0; th < thread_count; ++th) for (std::size_t th = 0; th < thread_count; ++th)
{ {
@ -65,6 +64,16 @@ namespace psemek::util
task_queue_.push(std::move(t)); task_queue_.push(std::move(t));
} }
void threadpool::post_at(clock::time_point time, task t)
{
if (time > clock::now())
{
task_queue_.push_at(time, std::move(t));
}
else
post(std::move(t));
}
void threadpool::stop() void threadpool::stop()
{ {
task_queue_.clear(); task_queue_.clear();