From e41e2492ca0c51080db17084db8e91bb8a312055 Mon Sep 17 00:00:00 2001 From: lisyarus Date: Sat, 21 Nov 2020 23:29:17 +0300 Subject: [PATCH] Improve synchronyzed_queue interface: support timed variants of all operations --- .../psemek/util/synchronyzed_queue.hpp | 352 ++++++++++++++++-- 1 file changed, 319 insertions(+), 33 deletions(-) diff --git a/libs/util/include/psemek/util/synchronyzed_queue.hpp b/libs/util/include/psemek/util/synchronyzed_queue.hpp index 0afa6db3..ebdd4e0c 100644 --- a/libs/util/include/psemek/util/synchronyzed_queue.hpp +++ b/libs/util/include/psemek/util/synchronyzed_queue.hpp @@ -28,21 +28,56 @@ namespace psemek::util void push(T const & x); void push(T && x); - void push_at(clock::time_point time, T const & x); - void push_at(clock::time_point time, T && x); - - T pop(); + template + void push_at(TimePoint event_time, T const & x); + template + void push_at(TimePoint event_time, T && x); bool try_push(T const & x); bool try_push(T && x); + template + bool try_push_for(T const & x, Duration wait_period); + template + bool try_push_for(T && x, Duration wait_period); + + template + bool try_push_until(T const & x, TimePoint wait_time); + template + bool try_push_until(T && x, TimePoint wait_time); + + template + bool try_push_at(TimePoint event_time, T const & x); + template + bool try_push_at(TimePoint event_time, T && x); + + template + bool try_push_at_for(TimePoint event_time, T const & x, Duration wait_period); + template + bool try_push_at_for(TimePoint event_time, T && x, Duration wait_period); + + template + bool try_push_at_until(EventTimePoint event_time, T const & x, WaitTimePoint wait_time); + template + bool try_push_at_until(EventTimePoint event_time, T && x, WaitTimePoint wait_time); + + T pop(); + std::optional try_pop(); + template + std::optional try_pop_for(Duration wait_period); + template + std::optional try_pop_until(TimePoint wait_time); void clear(); // Wait for the queue to become empty // e.g. when no new items are going to be pushed void wait(); + template + bool wait_for(Duration wait_period); + template + bool wait_until(TimePoint wait_time); std::size_t size() const; bool empty() const; @@ -85,7 +120,7 @@ namespace psemek::util } template - void synchronized_queue::push (T && x) + void synchronized_queue::push(T && x) { std::unique_lock lock{mutex_}; push_cv_.wait(lock, [this]{ return size_internal() < max_size(); }); @@ -94,15 +129,15 @@ namespace psemek::util pop_cv_.notify_one(); } - template - void synchronized_queue::push_at(clock::time_point time, T const & x) + template + void synchronized_queue::push_at(TimePoint event_time, T const & x) { std::unique_lock lock{mutex_}; push_cv_.wait(lock, [this]{ return size_internal() < max_size(); }); - if (time > clock::now()) + if (event_time > clock::now()) { - deferred_heap_.push_back({time, x}); + deferred_heap_.push_back({std::chrono::time_point_cast(event_time), x}); std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare()); } else @@ -114,13 +149,14 @@ namespace psemek::util } template - void synchronized_queue::push_at(clock::time_point time, T && x) + template + void synchronized_queue::push_at(TimePoint event_time, T && x) { std::unique_lock lock{mutex_}; push_cv_.wait(lock, [this]{ return size_internal() < max_size(); }); - if (time > clock::now()) + if (event_time > clock::now()) { - deferred_heap_.push_back({time, std::move(x)}); + deferred_heap_.push_back({std::chrono::time_point_cast(event_time), std::move(x)}); std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare()); } else @@ -131,27 +167,6 @@ namespace psemek::util pop_cv_.notify_one(); } - template - T synchronized_queue::pop() - { - std::unique_lock lock{mutex_}; - while (true) - { - pop_cv_.wait(lock, [this]{ return !empty_internal(); }); - flush_deferred(); - if (!queue_.empty()) - { - T x = std::move(queue_.front()); - queue_.pop_front(); - lock.unlock(); - push_cv_.notify_one(); - return x; - } - - pop_cv_.wait_until(lock, deferred_heap_.front().time); - } - } - template bool synchronized_queue::try_push(T const & x) { @@ -178,6 +193,223 @@ namespace psemek::util return true; } + template + template + bool synchronized_queue::try_push_for(T const & x, Duration wait_period) + { + std::unique_lock lock{mutex_}; + push_cv_.wait_for(lock, wait_period, [this]{ return size_internal() < max_size(); }); + if (size_internal() >= max_size()) + return false; + + queue_.push_back(x); + lock.unlock(); + pop_cv_.notify_one(); + return true; + } + + template + template + bool synchronized_queue::try_push_for(T && x, Duration wait_period) + { + std::unique_lock lock{mutex_}; + push_cv_.wait_for(lock, wait_period, [this]{ return size_internal() < max_size(); }); + if (size_internal() >= max_size()) + return false; + + queue_.push_back(std::move(x)); + lock.unlock(); + pop_cv_.notify_one(); + return true; + } + + template + template + bool synchronized_queue::try_push_until(T const & x, TimePoint wait_time) + { + std::unique_lock lock{mutex_}; + push_cv_.wait_until(lock, wait_time, [this]{ return size_internal() < max_size(); }); + if (size_internal() >= max_size()) + return false; + + queue_.push_back(x); + lock.unlock(); + pop_cv_.notify_one(); + return true; + } + + template + template + bool synchronized_queue::try_push_until(T && x, TimePoint wait_time) + { + std::unique_lock lock{mutex_}; + push_cv_.wait_until(lock, wait_time, [this]{ return size_internal() < max_size(); }); + if (size_internal() >= max_size()) + return false; + + queue_.push_back(std::move(x)); + lock.unlock(); + pop_cv_.notify_one(); + return true; + } + + template + template + bool synchronized_queue::try_push_at(TimePoint event_time, T const & x) + { + std::unique_lock lock{mutex_}; + if (size_internal() >= max_size()) + return false; + + if (event_time > clock::now()) + { + deferred_heap_.push_back({std::chrono::time_point_cast(event_time), x}); + std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare()); + } + else + { + queue_.push_back(x); + } + lock.unlock(); + pop_cv_.notify_one(); + return true; + } + + template + template + bool synchronized_queue::try_push_at(TimePoint event_time, T && x) + { + std::unique_lock lock{mutex_}; + if (size_internal() >= max_size()) + return false; + + if (event_time > clock::now()) + { + deferred_heap_.push_back({std::chrono::time_point_cast(event_time), std::move(x)}); + std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare()); + } + else + { + queue_.push_back(std::move(x)); + } + lock.unlock(); + pop_cv_.notify_one(); + return true; + } + + template + template + bool synchronized_queue::try_push_at_for(TimePoint event_time, T const & x, Duration wait_period) + { + std::unique_lock lock{mutex_}; + push_cv_.wait_for(lock, wait_period, [this]{ return size_internal() < max_size(); }); + if (size_internal() >= max_size()) + return false; + + if (event_time > clock::now()) + { + deferred_heap_.push_back({std::chrono::time_point_cast(event_time), x}); + std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare()); + } + else + { + queue_.push_back(x); + } + lock.unlock(); + pop_cv_.notify_one(); + return true; + } + + template + template + bool synchronized_queue::try_push_at_for(TimePoint event_time, T && x, Duration wait_period) + { + std::unique_lock lock{mutex_}; + push_cv_.wait_for(lock, wait_period, [this]{ return size_internal() < max_size(); }); + if (size_internal() >= max_size()) + return false; + + if (event_time > clock::now()) + { + deferred_heap_.push_back({std::chrono::time_point_cast(event_time), std::move(x)}); + std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare()); + } + else + { + queue_.push_back(std::move(x)); + } + lock.unlock(); + pop_cv_.notify_one(); + return true; + } + + template + template + bool synchronized_queue::try_push_at_until(EventTimePoint event_time, T const & x, WaitTimePoint wait_time) + { + std::unique_lock lock{mutex_}; + push_cv_.wait_until(lock, wait_time, [this]{ return size_internal() < max_size(); }); + if (size_internal() >= max_size()) + return false; + + if (event_time > clock::now()) + { + deferred_heap_.push_back({std::chrono::time_point_cast(event_time), x}); + std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare()); + } + else + { + queue_.push_back(x); + } + lock.unlock(); + pop_cv_.notify_one(); + return true; + } + + template + template + bool synchronized_queue::try_push_at_until(EventTimePoint event_time, T && x, WaitTimePoint wait_time) + { + std::unique_lock lock{mutex_}; + push_cv_.wait_until(lock, wait_time, [this]{ return size_internal() < max_size(); }); + if (size_internal() >= max_size()) + return false; + + if (event_time > clock::now()) + { + deferred_heap_.push_back({std::chrono::time_point_cast(event_time), std::move(x)}); + std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare()); + } + else + { + queue_.push_back(std::move(x)); + } + lock.unlock(); + pop_cv_.notify_one(); + return true; + } + + template + T synchronized_queue::pop() + { + std::unique_lock lock{mutex_}; + while (true) + { + pop_cv_.wait(lock, [this]{ return !empty_internal(); }); + flush_deferred(); + if (!queue_.empty()) + { + T x = std::move(queue_.front()); + queue_.pop_front(); + lock.unlock(); + push_cv_.notify_one(); + return x; + } + + pop_cv_.wait_until(lock, deferred_heap_.front().time); + } + } + template std::optional synchronized_queue::try_pop() { @@ -192,6 +424,44 @@ namespace psemek::util return { std::move(x) }; } + template + template + std::optional synchronized_queue::try_pop_for(Duration wait_period) + { + return try_pop_until(clock::now() + wait_period); + } + + template + template + std::optional synchronized_queue::try_pop_until(TimePoint wait_time) + { + std::unique_lock lock{mutex_}; + while (clock::now() < wait_time) + { + pop_cv_.wait_until(lock, wait_time, [this]{ return !empty_internal(); }); + flush_deferred(); + if (!queue_.empty()) + { + T x = std::move(queue_.front()); + queue_.pop_front(); + lock.unlock(); + push_cv_.notify_one(); + return x; + } + + // here, queue_.empty() + + if (deferred_heap_.empty()) + { + // queue_.empty() && deferred_heap_.empty() mean than the wait_until exited by timeout + return std::nullopt; + } + + pop_cv_.wait_until(lock, std::min(deferred_heap_.front().time, wait_time)); + } + return std::nullopt; + } + template void synchronized_queue::clear() { @@ -209,6 +479,22 @@ namespace psemek::util push_cv_.wait(lock, [this]{ return empty_internal(); }); } + template + template + bool synchronized_queue::wait_for(Duration wait_period) + { + std::unique_lock lock{mutex_}; + return push_cv_.wait_for(lock, wait_period, [this]{ return empty_internal(); }); + } + + template + template + bool synchronized_queue::wait_until(TimePoint wait_time) + { + std::unique_lock lock{mutex_}; + return push_cv_.wait_until(lock, wait_time, [this]{ return empty_internal(); }); + } + template std::size_t synchronized_queue::size() const {