From a8a1f44a89878eb31045aabcb23137a61236f364 Mon Sep 17 00:00:00 2001 From: lisyarus Date: Sat, 21 Nov 2020 15:14:06 +0300 Subject: [PATCH] Support deferred events in synchronyzed_queue & remove timeout methods --- .../psemek/util/synchronyzed_queue.hpp | 221 +++++++++++++----- 1 file changed, 158 insertions(+), 63 deletions(-) diff --git a/libs/util/include/psemek/util/synchronyzed_queue.hpp b/libs/util/include/psemek/util/synchronyzed_queue.hpp index 8a10c036..0afa6db3 100644 --- a/libs/util/include/psemek/util/synchronyzed_queue.hpp +++ b/libs/util/include/psemek/util/synchronyzed_queue.hpp @@ -1,11 +1,12 @@ #pragma once #include +#include #include #include #include -#include #include +#include namespace psemek::util { @@ -13,6 +14,8 @@ namespace psemek::util template struct synchronized_queue { + using clock = std::chrono::high_resolution_clock; + synchronized_queue(std::size_t max_size = std::numeric_limits::max()) noexcept : max_size_(max_size) {} @@ -24,15 +27,16 @@ namespace psemek::util void push(T const & x); void push(T && x); + + void push_at(clock::time_point time, T const & x); + void push_at(clock::time_point time, T && x); + T pop(); bool try_push(T const & x); - template - bool try_push(T const & x, std::chrono::duration const & timeout); + bool try_push(T && x); std::optional try_pop(); - template - std::optional try_pop(std::chrono::duration const & timeout); void clear(); @@ -41,116 +45,207 @@ namespace psemek::util void wait(); std::size_t size() const; + bool empty() const; private: - mutable std::mutex mutex; - std::condition_variable push_cv, pop_cv; - std::deque queue; + struct deferred + { + clock::time_point time; + T value; + }; + + mutable std::mutex mutex_; + std::condition_variable push_cv_, pop_cv_; + std::deque queue_; + std::vector deferred_heap_; std::size_t const max_size_; + + static auto heap_compare(); + + void flush_deferred(); + + std::size_t size_internal() const; + bool empty_internal() const; }; + template + auto synchronized_queue::heap_compare() + { + return [](deferred const & d1, deferred const & d2){ return d1.time > d2.time; }; + } + template void synchronized_queue::push(T const & x) { - std::unique_lock lock{mutex}; - push_cv.wait(lock, [this]{ return queue.size() < max_size(); }); - queue.push_back(x); - pop_cv.notify_one(); + std::unique_lock lock{mutex_}; + push_cv_.wait(lock, [this]{ return size_internal() < max_size(); }); + queue_.push_back(x); + lock.unlock(); + pop_cv_.notify_one(); } template void synchronized_queue::push (T && x) { - std::unique_lock lock{mutex}; - push_cv.wait(lock, [this]{ return queue.size() < max_size(); }); - queue.push_back(std::move(x)); - pop_cv.notify_one(); + std::unique_lock lock{mutex_}; + push_cv_.wait(lock, [this]{ return size_internal() < max_size(); }); + queue_.push_back(std::move(x)); + lock.unlock(); + pop_cv_.notify_one(); + } + + + template + void synchronized_queue::push_at(clock::time_point time, T const & x) + { + std::unique_lock lock{mutex_}; + push_cv_.wait(lock, [this]{ return size_internal() < max_size(); }); + if (time > clock::now()) + { + deferred_heap_.push_back({time, x}); + std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare()); + } + else + { + queue_.push_back(x); + } + lock.unlock(); + pop_cv_.notify_one(); + } + + template + void synchronized_queue::push_at(clock::time_point time, T && x) + { + std::unique_lock lock{mutex_}; + push_cv_.wait(lock, [this]{ return size_internal() < max_size(); }); + if (time > clock::now()) + { + deferred_heap_.push_back({time, std::move(x)}); + std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare()); + } + else + { + queue_.push_back(std::move(x)); + } + lock.unlock(); + pop_cv_.notify_one(); } template T synchronized_queue::pop() { - std::unique_lock lock{mutex}; - pop_cv.wait(lock, [this]{ return !queue.empty(); }); - T x = std::move(queue.front()); - queue.pop_front(); - push_cv.notify_one(); - return x; + std::unique_lock lock{mutex_}; + while (true) + { + pop_cv_.wait(lock, [this]{ return !empty_internal(); }); + flush_deferred(); + if (!queue_.empty()) + { + T x = std::move(queue_.front()); + queue_.pop_front(); + lock.unlock(); + push_cv_.notify_one(); + return x; + } + + pop_cv_.wait_until(lock, deferred_heap_.front().time); + } } template bool synchronized_queue::try_push(T const & x) { - std::lock_guard lock{mutex}; - if (queue.size() >= max_size()) + std::unique_lock lock{mutex_}; + if (size_internal() >= max_size()) return false; - queue.push_back(x); - pop_cv.notify_one(); + queue_.push_back(x); + lock.unlock(); + pop_cv_.notify_one(); return true; } template - template - bool synchronized_queue::try_push(T const & x, std::chrono::duration const & timeout) + bool synchronized_queue::try_push(T && x) { - std::unique_lock lock{mutex}; - if (push_cv.wait_for(lock, timeout, [this]{ return queue.size() < max_size(); })) - { - queue.push_back(std::move(x)); - pop_cv.notify_one(); - return true; - } - return false; + std::unique_lock lock{mutex_}; + if (size_internal() >= max_size()) + return false; + + queue_.push_back(std::move(x)); + lock.unlock(); + pop_cv_.notify_one(); + return true; } template std::optional synchronized_queue::try_pop() { - std::lock_guard lock{mutex}; - if (queue.empty()) + std::unique_lock lock{mutex_}; + flush_deferred(); + if (queue_.empty()) return std::nullopt; - T x = std::move(queue.front()); - queue.pop_front(); - push_cv.notify_one(); + T x = std::move(queue_.front()); + queue_.pop_front(); + lock.unlock(); + push_cv_.notify_one(); return { std::move(x) }; } - template - template - std::optional synchronized_queue::try_pop(std::chrono::duration const & timeout) - { - std::unique_lock lock{mutex}; - if (pop_cv.wait_for(lock, timeout, [this]{ return !queue.empty(); })) - { - T x = std::move(queue.front()); - queue.pop_front(); - push_cv.notify_one(); - return {std::move(x)}; - } - return std::nullopt; - } - template void synchronized_queue::clear() { - std::lock_guard lock{mutex}; - queue.clear(); - push_cv.notify_all(); + std::unique_lock lock{mutex_}; + queue_.clear(); + deferred_heap_.clear(); + lock.unlock(); + push_cv_.notify_all(); } template void synchronized_queue::wait() { - std::unique_lock lock{mutex}; - push_cv.wait(lock, [this]{ return queue.empty(); }); + std::unique_lock lock{mutex_}; + push_cv_.wait(lock, [this]{ return empty_internal(); }); } template std::size_t synchronized_queue::size() const { - std::lock_guard lock{mutex}; - return queue.size(); + std::lock_guard lock{mutex_}; + return size_internal(); + } + + template + bool synchronized_queue::empty() const + { + std::lock_guard lock{mutex_}; + return empty_internal(); + } + + template + void synchronized_queue::flush_deferred() + { + auto const now = clock::now(); + while (!deferred_heap_.empty() && deferred_heap_.front().time <= now) + { + queue_.push_back(std::move(deferred_heap_.front().value)); + std::pop_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare()); + deferred_heap_.pop_back(); + } + } + + + template + std::size_t synchronized_queue::size_internal() const + { + return queue_.size() + deferred_heap_.size(); + } + + template + bool synchronized_queue::empty_internal() const + { + return queue_.empty() && deferred_heap_.empty(); } }