Support deferred events in synchronyzed_queue & remove timeout methods

This commit is contained in:
Nikita Lisitsa 2020-11-21 15:14:06 +03:00
parent 4fa7e1f824
commit a8a1f44a89

View file

@ -1,11 +1,12 @@
#pragma once
#include <deque>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <optional>
#include <thread>
#include <chrono>
#include <thread>
namespace psemek::util
{
@ -13,6 +14,8 @@ namespace psemek::util
template <typename T>
struct synchronized_queue
{
using clock = std::chrono::high_resolution_clock;
synchronized_queue(std::size_t max_size = std::numeric_limits<std::size_t>::max()) noexcept
: max_size_(max_size)
{}
@ -24,15 +27,16 @@ namespace psemek::util
void push(T const & x);
void push(T && x);
void push_at(clock::time_point time, T const & x);
void push_at(clock::time_point time, T && x);
T pop();
bool try_push(T const & x);
template <typename Rep, typename Period>
bool try_push(T const & x, std::chrono::duration<Rep, Period> const & timeout);
bool try_push(T && x);
std::optional<T> try_pop();
template <typename Rep, typename Period>
std::optional<T> try_pop(std::chrono::duration<Rep, Period> const & timeout);
void clear();
@ -41,116 +45,207 @@ namespace psemek::util
void wait();
std::size_t size() const;
bool empty() const;
private:
mutable std::mutex mutex;
std::condition_variable push_cv, pop_cv;
std::deque<T> queue;
struct deferred
{
clock::time_point time;
T value;
};
mutable std::mutex mutex_;
std::condition_variable push_cv_, pop_cv_;
std::deque<T> queue_;
std::vector<deferred> deferred_heap_;
std::size_t const max_size_;
static auto heap_compare();
void flush_deferred();
std::size_t size_internal() const;
bool empty_internal() const;
};
template <typename T>
auto synchronized_queue<T>::heap_compare()
{
return [](deferred const & d1, deferred const & d2){ return d1.time > d2.time; };
}
template <typename T>
void synchronized_queue<T>::push(T const & x)
{
std::unique_lock lock{mutex};
push_cv.wait(lock, [this]{ return queue.size() < max_size(); });
queue.push_back(x);
pop_cv.notify_one();
std::unique_lock lock{mutex_};
push_cv_.wait(lock, [this]{ return size_internal() < max_size(); });
queue_.push_back(x);
lock.unlock();
pop_cv_.notify_one();
}
template <typename T>
void synchronized_queue<T>::push (T && x)
{
std::unique_lock lock{mutex};
push_cv.wait(lock, [this]{ return queue.size() < max_size(); });
queue.push_back(std::move(x));
pop_cv.notify_one();
std::unique_lock lock{mutex_};
push_cv_.wait(lock, [this]{ return size_internal() < max_size(); });
queue_.push_back(std::move(x));
lock.unlock();
pop_cv_.notify_one();
}
template <typename T>
void synchronized_queue<T>::push_at(clock::time_point time, T const & x)
{
std::unique_lock lock{mutex_};
push_cv_.wait(lock, [this]{ return size_internal() < max_size(); });
if (time > clock::now())
{
deferred_heap_.push_back({time, x});
std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare());
}
else
{
queue_.push_back(x);
}
lock.unlock();
pop_cv_.notify_one();
}
template <typename T>
void synchronized_queue<T>::push_at(clock::time_point time, T && x)
{
std::unique_lock lock{mutex_};
push_cv_.wait(lock, [this]{ return size_internal() < max_size(); });
if (time > clock::now())
{
deferred_heap_.push_back({time, std::move(x)});
std::push_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare());
}
else
{
queue_.push_back(std::move(x));
}
lock.unlock();
pop_cv_.notify_one();
}
template <typename T>
T synchronized_queue<T>::pop()
{
std::unique_lock lock{mutex};
pop_cv.wait(lock, [this]{ return !queue.empty(); });
T x = std::move(queue.front());
queue.pop_front();
push_cv.notify_one();
return x;
std::unique_lock lock{mutex_};
while (true)
{
pop_cv_.wait(lock, [this]{ return !empty_internal(); });
flush_deferred();
if (!queue_.empty())
{
T x = std::move(queue_.front());
queue_.pop_front();
lock.unlock();
push_cv_.notify_one();
return x;
}
pop_cv_.wait_until(lock, deferred_heap_.front().time);
}
}
template <typename T>
bool synchronized_queue<T>::try_push(T const & x)
{
std::lock_guard lock{mutex};
if (queue.size() >= max_size())
std::unique_lock lock{mutex_};
if (size_internal() >= max_size())
return false;
queue.push_back(x);
pop_cv.notify_one();
queue_.push_back(x);
lock.unlock();
pop_cv_.notify_one();
return true;
}
template <typename T>
template <typename Rep, typename Period>
bool synchronized_queue<T>::try_push(T const & x, std::chrono::duration<Rep, Period> const & timeout)
bool synchronized_queue<T>::try_push(T && x)
{
std::unique_lock lock{mutex};
if (push_cv.wait_for(lock, timeout, [this]{ return queue.size() < max_size(); }))
{
queue.push_back(std::move(x));
pop_cv.notify_one();
return true;
}
return false;
std::unique_lock lock{mutex_};
if (size_internal() >= max_size())
return false;
queue_.push_back(std::move(x));
lock.unlock();
pop_cv_.notify_one();
return true;
}
template <typename T>
std::optional<T> synchronized_queue<T>::try_pop()
{
std::lock_guard lock{mutex};
if (queue.empty())
std::unique_lock lock{mutex_};
flush_deferred();
if (queue_.empty())
return std::nullopt;
T x = std::move(queue.front());
queue.pop_front();
push_cv.notify_one();
T x = std::move(queue_.front());
queue_.pop_front();
lock.unlock();
push_cv_.notify_one();
return { std::move(x) };
}
template <typename T>
template <typename Rep, typename Period>
std::optional<T> synchronized_queue<T>::try_pop(std::chrono::duration<Rep, Period> const & timeout)
{
std::unique_lock lock{mutex};
if (pop_cv.wait_for(lock, timeout, [this]{ return !queue.empty(); }))
{
T x = std::move(queue.front());
queue.pop_front();
push_cv.notify_one();
return {std::move(x)};
}
return std::nullopt;
}
template <typename T>
void synchronized_queue<T>::clear()
{
std::lock_guard lock{mutex};
queue.clear();
push_cv.notify_all();
std::unique_lock lock{mutex_};
queue_.clear();
deferred_heap_.clear();
lock.unlock();
push_cv_.notify_all();
}
template <typename T>
void synchronized_queue<T>::wait()
{
std::unique_lock lock{mutex};
push_cv.wait(lock, [this]{ return queue.empty(); });
std::unique_lock lock{mutex_};
push_cv_.wait(lock, [this]{ return empty_internal(); });
}
template <typename T>
std::size_t synchronized_queue<T>::size() const
{
std::lock_guard lock{mutex};
return queue.size();
std::lock_guard lock{mutex_};
return size_internal();
}
template <typename T>
bool synchronized_queue<T>::empty() const
{
std::lock_guard lock{mutex_};
return empty_internal();
}
template <typename T>
void synchronized_queue<T>::flush_deferred()
{
auto const now = clock::now();
while (!deferred_heap_.empty() && deferred_heap_.front().time <= now)
{
queue_.push_back(std::move(deferred_heap_.front().value));
std::pop_heap(deferred_heap_.begin(), deferred_heap_.end(), heap_compare());
deferred_heap_.pop_back();
}
}
template <typename T>
std::size_t synchronized_queue<T>::size_internal() const
{
return queue_.size() + deferred_heap_.size();
}
template <typename T>
bool synchronized_queue<T>::empty_internal() const
{
return queue_.empty() && deferred_heap_.empty();
}
}