Improve synchronyzed_queue interface: support timed variants of all operations

This commit is contained in:
Nikita Lisitsa 2020-11-21 23:29:17 +03:00
parent ff5483bfef
commit e41e2492ca

View file

@ -28,21 +28,56 @@ namespace psemek::util
void push(T const & x);
void push(T && x);
void push_at(clock::time_point time, T const & x);
void push_at(clock::time_point time, T && x);
T pop();
template <typename TimePoint>
void push_at(TimePoint event_time, T const & x);
template <typename TimePoint>
void push_at(TimePoint event_time, T && x);
bool try_push(T const & x);
bool try_push(T && x);
template <typename Duration>
bool try_push_for(T const & x, Duration wait_period);
template <typename Duration>
bool try_push_for(T && x, Duration wait_period);
template <typename TimePoint>
bool try_push_until(T const & x, TimePoint wait_time);
template <typename TimePoint>
bool try_push_until(T && x, TimePoint wait_time);
template <typename TimePoint>
bool try_push_at(TimePoint event_time, T const & x);
template <typename TimePoint>
bool try_push_at(TimePoint event_time, T && x);
template <typename TimePoint, typename Duration>
bool try_push_at_for(TimePoint event_time, T const & x, Duration wait_period);
template <typename TimePoint, typename Duration>
bool try_push_at_for(TimePoint event_time, T && x, Duration wait_period);
template <typename EventTimePoint, typename WaitTimePoint>
bool try_push_at_until(EventTimePoint event_time, T const & x, WaitTimePoint wait_time);
template <typename EventTimePoint, typename WaitTimePoint>
bool try_push_at_until(EventTimePoint event_time, T && x, WaitTimePoint wait_time);
T pop();
std::optional<T> try_pop();
template <typename Duration>
std::optional<T> try_pop_for(Duration wait_period);
template <typename TimePoint>
std::optional<T> try_pop_until(TimePoint wait_time);
void clear();
// Wait for the queue to become empty
// e.g. when no new items are going to be pushed
void wait();
template <typename Duration>
bool wait_for(Duration wait_period);
template <typename TimePoint>
bool wait_until(TimePoint wait_time);
std::size_t size() const;
bool empty() const;
@ -85,7 +120,7 @@ namespace psemek::util
}
template <typename T>
void synchronized_queue<T>::push (T && x)
void synchronized_queue<T>::push(T && x)
{
std::unique_lock lock{mutex_};
push_cv_.wait(lock, [this]{ return size_internal() < max_size(); });
@ -94,15 +129,15 @@ namespace psemek::util
pop_cv_.notify_one();
}
template <typename T>
void synchronized_queue<T>::push_at(clock::time_point time, T const & x)
template <typename TimePoint>
void synchronized_queue<T>::push_at(TimePoint event_time, T const & x)
{
std::unique_lock lock{mutex_};
push_cv_.wait(lock, [this]{ return size_internal() < max_size(); });
if (time > clock::now())
if (event_time > clock::now())
{
deferred_heap_.push_back({time, x});
deferred_heap_.push_back({std::chrono::time_point_cast<clock::duration>(event_time), x});
std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare());
}
else
@ -114,13 +149,14 @@ namespace psemek::util
}
template <typename T>
void synchronized_queue<T>::push_at(clock::time_point time, T && x)
template <typename TimePoint>
void synchronized_queue<T>::push_at(TimePoint event_time, T && x)
{
std::unique_lock lock{mutex_};
push_cv_.wait(lock, [this]{ return size_internal() < max_size(); });
if (time > clock::now())
if (event_time > clock::now())
{
deferred_heap_.push_back({time, std::move(x)});
deferred_heap_.push_back({std::chrono::time_point_cast<clock::duration>(event_time), std::move(x)});
std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare());
}
else
@ -131,27 +167,6 @@ namespace psemek::util
pop_cv_.notify_one();
}
template <typename T>
T synchronized_queue<T>::pop()
{
std::unique_lock lock{mutex_};
while (true)
{
pop_cv_.wait(lock, [this]{ return !empty_internal(); });
flush_deferred();
if (!queue_.empty())
{
T x = std::move(queue_.front());
queue_.pop_front();
lock.unlock();
push_cv_.notify_one();
return x;
}
pop_cv_.wait_until(lock, deferred_heap_.front().time);
}
}
template <typename T>
bool synchronized_queue<T>::try_push(T const & x)
{
@ -178,6 +193,223 @@ namespace psemek::util
return true;
}
template <typename T>
template <typename Duration>
bool synchronized_queue<T>::try_push_for(T const & x, Duration wait_period)
{
std::unique_lock lock{mutex_};
push_cv_.wait_for(lock, wait_period, [this]{ return size_internal() < max_size(); });
if (size_internal() >= max_size())
return false;
queue_.push_back(x);
lock.unlock();
pop_cv_.notify_one();
return true;
}
template <typename T>
template <typename Duration>
bool synchronized_queue<T>::try_push_for(T && x, Duration wait_period)
{
std::unique_lock lock{mutex_};
push_cv_.wait_for(lock, wait_period, [this]{ return size_internal() < max_size(); });
if (size_internal() >= max_size())
return false;
queue_.push_back(std::move(x));
lock.unlock();
pop_cv_.notify_one();
return true;
}
template <typename T>
template <typename TimePoint>
bool synchronized_queue<T>::try_push_until(T const & x, TimePoint wait_time)
{
std::unique_lock lock{mutex_};
push_cv_.wait_until(lock, wait_time, [this]{ return size_internal() < max_size(); });
if (size_internal() >= max_size())
return false;
queue_.push_back(x);
lock.unlock();
pop_cv_.notify_one();
return true;
}
template <typename T>
template <typename TimePoint>
bool synchronized_queue<T>::try_push_until(T && x, TimePoint wait_time)
{
std::unique_lock lock{mutex_};
push_cv_.wait_until(lock, wait_time, [this]{ return size_internal() < max_size(); });
if (size_internal() >= max_size())
return false;
queue_.push_back(std::move(x));
lock.unlock();
pop_cv_.notify_one();
return true;
}
template <typename T>
template <typename TimePoint>
bool synchronized_queue<T>::try_push_at(TimePoint event_time, T const & x)
{
std::unique_lock lock{mutex_};
if (size_internal() >= max_size())
return false;
if (event_time > clock::now())
{
deferred_heap_.push_back({std::chrono::time_point_cast<clock::duration>(event_time), x});
std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare());
}
else
{
queue_.push_back(x);
}
lock.unlock();
pop_cv_.notify_one();
return true;
}
template <typename T>
template <typename TimePoint>
bool synchronized_queue<T>::try_push_at(TimePoint event_time, T && x)
{
std::unique_lock lock{mutex_};
if (size_internal() >= max_size())
return false;
if (event_time > clock::now())
{
deferred_heap_.push_back({std::chrono::time_point_cast<clock::duration>(event_time), std::move(x)});
std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare());
}
else
{
queue_.push_back(std::move(x));
}
lock.unlock();
pop_cv_.notify_one();
return true;
}
template <typename T>
template <typename TimePoint, typename Duration>
bool synchronized_queue<T>::try_push_at_for(TimePoint event_time, T const & x, Duration wait_period)
{
std::unique_lock lock{mutex_};
push_cv_.wait_for(lock, wait_period, [this]{ return size_internal() < max_size(); });
if (size_internal() >= max_size())
return false;
if (event_time > clock::now())
{
deferred_heap_.push_back({std::chrono::time_point_cast<clock::duration>(event_time), x});
std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare());
}
else
{
queue_.push_back(x);
}
lock.unlock();
pop_cv_.notify_one();
return true;
}
template <typename T>
template <typename TimePoint, typename Duration>
bool synchronized_queue<T>::try_push_at_for(TimePoint event_time, T && x, Duration wait_period)
{
std::unique_lock lock{mutex_};
push_cv_.wait_for(lock, wait_period, [this]{ return size_internal() < max_size(); });
if (size_internal() >= max_size())
return false;
if (event_time > clock::now())
{
deferred_heap_.push_back({std::chrono::time_point_cast<clock::duration>(event_time), std::move(x)});
std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare());
}
else
{
queue_.push_back(std::move(x));
}
lock.unlock();
pop_cv_.notify_one();
return true;
}
template <typename T>
template <typename EventTimePoint, typename WaitTimePoint>
bool synchronized_queue<T>::try_push_at_until(EventTimePoint event_time, T const & x, WaitTimePoint wait_time)
{
std::unique_lock lock{mutex_};
push_cv_.wait_until(lock, wait_time, [this]{ return size_internal() < max_size(); });
if (size_internal() >= max_size())
return false;
if (event_time > clock::now())
{
deferred_heap_.push_back({std::chrono::time_point_cast<clock::duration>(event_time), x});
std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare());
}
else
{
queue_.push_back(x);
}
lock.unlock();
pop_cv_.notify_one();
return true;
}
template <typename T>
template <typename EventTimePoint, typename WaitTimePoint>
bool synchronized_queue<T>::try_push_at_until(EventTimePoint event_time, T && x, WaitTimePoint wait_time)
{
std::unique_lock lock{mutex_};
push_cv_.wait_until(lock, wait_time, [this]{ return size_internal() < max_size(); });
if (size_internal() >= max_size())
return false;
if (event_time > clock::now())
{
deferred_heap_.push_back({std::chrono::time_point_cast<clock::duration>(event_time), std::move(x)});
std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare());
}
else
{
queue_.push_back(std::move(x));
}
lock.unlock();
pop_cv_.notify_one();
return true;
}
template <typename T>
T synchronized_queue<T>::pop()
{
std::unique_lock lock{mutex_};
while (true)
{
pop_cv_.wait(lock, [this]{ return !empty_internal(); });
flush_deferred();
if (!queue_.empty())
{
T x = std::move(queue_.front());
queue_.pop_front();
lock.unlock();
push_cv_.notify_one();
return x;
}
pop_cv_.wait_until(lock, deferred_heap_.front().time);
}
}
template <typename T>
std::optional<T> synchronized_queue<T>::try_pop()
{
@ -192,6 +424,44 @@ namespace psemek::util
return { std::move(x) };
}
template <typename T>
template <typename Duration>
std::optional<T> synchronized_queue<T>::try_pop_for(Duration wait_period)
{
return try_pop_until(clock::now() + wait_period);
}
template <typename T>
template <typename TimePoint>
std::optional<T> synchronized_queue<T>::try_pop_until(TimePoint wait_time)
{
std::unique_lock lock{mutex_};
while (clock::now() < wait_time)
{
pop_cv_.wait_until(lock, wait_time, [this]{ return !empty_internal(); });
flush_deferred();
if (!queue_.empty())
{
T x = std::move(queue_.front());
queue_.pop_front();
lock.unlock();
push_cv_.notify_one();
return x;
}
// here, queue_.empty()
if (deferred_heap_.empty())
{
// queue_.empty() && deferred_heap_.empty() mean than the wait_until exited by timeout
return std::nullopt;
}
pop_cv_.wait_until(lock, std::min(deferred_heap_.front().time, wait_time));
}
return std::nullopt;
}
template <typename T>
void synchronized_queue<T>::clear()
{
@ -209,6 +479,22 @@ namespace psemek::util
push_cv_.wait(lock, [this]{ return empty_internal(); });
}
template <typename T>
template <typename Duration>
bool synchronized_queue<T>::wait_for(Duration wait_period)
{
std::unique_lock lock{mutex_};
return push_cv_.wait_for(lock, wait_period, [this]{ return empty_internal(); });
}
template <typename T>
template <typename TimePoint>
bool synchronized_queue<T>::wait_until(TimePoint wait_time)
{
std::unique_lock lock{mutex_};
return push_cv_.wait_until(lock, wait_time, [this]{ return empty_internal(); });
}
template <typename T>
std::size_t synchronized_queue<T>::size() const
{