Implement event_loop using synchronyzed_queue

This commit is contained in:
Nikita Lisitsa 2020-11-21 23:29:39 +03:00
parent e41e2492ca
commit 0f1479abbd
2 changed files with 13 additions and 56 deletions

View file

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <psemek/async/executor.hpp> #include <psemek/async/executor.hpp>
#include <psemek/util/synchronyzed_queue.hpp>
#include <deque> #include <deque>
#include <vector> #include <vector>
@ -28,18 +29,7 @@ namespace psemek::async
~event_loop() override { stop(); } ~event_loop() override { stop(); }
private: private:
struct deferred_task util::synchronized_queue<task> queue_;
{
clock::time_point time;
task func;
};
std::deque<task> task_queue_;
std::vector<deferred_task> deferred_task_heap_;
static auto heap_compare();
void flush_deferred();
}; };

View file

@ -5,48 +5,27 @@
namespace psemek::async namespace psemek::async
{ {
auto event_loop::heap_compare()
{
return [](deferred_task const & t1, deferred_task const & t2){ return t1.time > t2.time; };
}
void event_loop::post(task t) void event_loop::post(task t)
{ {
task_queue_.push_back(std::move(t)); queue_.push(std::move(t));
} }
void event_loop::post_at(clock::time_point time, task t) void event_loop::post_at(clock::time_point time, task t)
{ {
if (time > clock::now()) queue_.push_at(time, std::move(t));
{
deferred_task_heap_.push_back(deferred_task{time, std::move(t)});
std::push_heap(deferred_task_heap_.begin(), deferred_task_heap_.end(), heap_compare());
}
else
task_queue_.push_back(std::move(t));
} }
void event_loop::stop() void event_loop::stop()
{ {
task_queue_.clear(); queue_.clear();
deferred_task_heap_.clear();
} }
void event_loop::wait() void event_loop::wait()
{ {
while (!task_queue_.empty() || !deferred_task_heap_.empty()) while (!queue_.empty())
{ {
flush_deferred(); auto t = queue_.pop();
if (task_queue_.empty()) t();
{
std::this_thread::sleep_until(deferred_task_heap_.front().time);
}
else
{
auto t = std::move(task_queue_.front());
task_queue_.pop_front();
t();
}
} }
} }
@ -57,29 +36,17 @@ namespace psemek::async
void event_loop::wait_until(clock::time_point time) void event_loop::wait_until(clock::time_point time)
{ {
while ((!task_queue_.empty() || !deferred_task_heap_.empty()) && clock::now() < time) while (!queue_.empty() && clock::now() < time)
{ {
flush_deferred(); auto t = queue_.try_pop_until(time);
auto t = std::move(task_queue_.front()); if (!t) break;
task_queue_.pop_front(); (*t)();
t();
} }
} }
std::size_t event_loop::task_count() const std::size_t event_loop::task_count() const
{ {
return task_queue_.size() + deferred_task_heap_.size(); return queue_.size();
}
void event_loop::flush_deferred()
{
auto const now = clock::now();
while (!deferred_task_heap_.empty() && deferred_task_heap_.front().time <= now)
{
task_queue_.push_back(std::move(deferred_task_heap_.front().func));
std::pop_heap(deferred_task_heap_.begin(), deferred_task_heap_.end(), heap_compare());
deferred_task_heap_.pop_back();
}
} }
} }