diff --git a/libs/async/include/psemek/async/event_loop.hpp b/libs/async/include/psemek/async/event_loop.hpp index bcde4dba..fc95fdaf 100644 --- a/libs/async/include/psemek/async/event_loop.hpp +++ b/libs/async/include/psemek/async/event_loop.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -28,18 +29,7 @@ namespace psemek::async ~event_loop() override { stop(); } private: - struct deferred_task - { - clock::time_point time; - task func; - }; - - std::deque task_queue_; - std::vector deferred_task_heap_; - - static auto heap_compare(); - - void flush_deferred(); + util::synchronized_queue queue_; }; diff --git a/libs/async/source/event_loop.cpp b/libs/async/source/event_loop.cpp index 27f0f11b..3916b1a1 100644 --- a/libs/async/source/event_loop.cpp +++ b/libs/async/source/event_loop.cpp @@ -5,48 +5,27 @@ namespace psemek::async { - auto event_loop::heap_compare() - { - return [](deferred_task const & t1, deferred_task const & t2){ return t1.time > t2.time; }; - } - void event_loop::post(task t) { - task_queue_.push_back(std::move(t)); + queue_.push(std::move(t)); } void event_loop::post_at(clock::time_point time, task t) { - if (time > clock::now()) - { - deferred_task_heap_.push_back(deferred_task{time, std::move(t)}); - std::push_heap(deferred_task_heap_.begin(), deferred_task_heap_.end(), heap_compare()); - } - else - task_queue_.push_back(std::move(t)); + queue_.push_at(time, std::move(t)); } void event_loop::stop() { - task_queue_.clear(); - deferred_task_heap_.clear(); + queue_.clear(); } void event_loop::wait() { - while (!task_queue_.empty() || !deferred_task_heap_.empty()) + while (!queue_.empty()) { - flush_deferred(); - if (task_queue_.empty()) - { - std::this_thread::sleep_until(deferred_task_heap_.front().time); - } - else - { - auto t = std::move(task_queue_.front()); - task_queue_.pop_front(); - t(); - } + auto t = queue_.pop(); + t(); } } @@ -57,29 +36,17 @@ namespace psemek::async void event_loop::wait_until(clock::time_point time) { - while ((!task_queue_.empty() || !deferred_task_heap_.empty()) && clock::now() < time) + while (!queue_.empty() && clock::now() < time) { - flush_deferred(); - auto t = std::move(task_queue_.front()); - task_queue_.pop_front(); - t(); + auto t = queue_.try_pop_until(time); + if (!t) break; + (*t)(); } } std::size_t event_loop::task_count() const { - return task_queue_.size() + deferred_task_heap_.size(); - } - - void event_loop::flush_deferred() - { - auto const now = clock::now(); - while (!deferred_task_heap_.empty() && deferred_task_heap_.front().time <= now) - { - task_queue_.push_back(std::move(deferred_task_heap_.front().func)); - std::pop_heap(deferred_task_heap_.begin(), deferred_task_heap_.end(), heap_compare()); - deferred_task_heap_.pop_back(); - } + return queue_.size(); } }