C++ 安全佇列
阿新 • • 發佈:2021-10-20
所謂的安全佇列,就是封裝一個加條件變數和互斥鎖的一個佇列類,以便在多執行緒訪問該佇列資源時是並行操作
1、多個執行緒從一個佇列中讀取資料,加互斥鎖和條件變數
#ifndef SHAREDQUEUE_HPP #define SHAREDQUEUE_HPP #include<iostream> #include<memory> #include<queue> #include<mutex> #include<condition_variable> template <typename T> class SharedQueue {public: SharedQueue(); ~SharedQueue(); bool pop(std::unique_ptr<T>&& pData); void push(std::unique_ptr<T>&& item); int size(); bool empty(); public: T operator[] (int k) { return m_queue[k]; } private: std::deque<std::unique_ptr<T>> m_queue; std::mutex m_mutex; std::condition_variable m_cond; }; template<typename T> SharedQueue<T>::SharedQueue(){ } template <typename T> SharedQueue<T>::~SharedQueue(){ } template <typename T> bool SharedQueue<T>::pop(std::unique_ptr<T>&& pData) { std::unique_lock<std::mutex> mlock(m_mutex); if (m_queue.empty()) {return false; } pData = std::move(m_queue.front()); m_queue.pop_front(); return true; } template <typename T> void SharedQueue<T>::push(std::unique_ptr<T>&& item) { std::unique_lock<std::mutex> mlock(m_mutex); m_queue.push_back(std::move(item)); mlock.unlock(); m_cond.notify_all(); } template <typename T> int SharedQueue<T>::size() { std::unique_lock<std::mutex> mlock(m_mutex); int size = m_queue.size(); mlock.unlock(); return size; } template <typename T> bool SharedQueue<T>::empty() { std::unique_lock<std::mutex> mlock(m_mutex); return m_queue.empty(); } #endif
執行結果:
Thread idx : 0
pData : 0
pData : 1
pData : 2
pData : 3
pData : 4
pData : 5
pData : 6
pData : 7
pData : 8
pData : 9
Thread idx : 1
Thread idx : 2
Thread idx : 3
2、如果多執行緒安全佇列是用在生產者和消費者場景中,即多個生產者(多個執行緒)往佇列裡寫資料,多個消費者(多個執行緒)往佇列中讀資料
#ifndef SHAREDQUEUE_HPP #define SHAREDQUEUE_HPP #include<iostream> #include<memory> #include<queue> #include<mutex> #include<condition_variable> template <typename T> class SharedQueue { public: SharedQueue(); ~SharedQueue(); T& front(); T pop(); void pop_front(); void push_back(const T& item); void push_back(T&& item); void shut_down(); // bool pop(std::unique_ptr<T>&& pData); // void push(std::unique_ptr<T>&& item); int size(); bool empty(); bool is_shutdown(); public: T operator[] (int k) { return m_queue[k]; } private: std::deque<std::unique_ptr<T>> m_queue; std::mutex m_mutex; std::condition_variable m_cond; bool m_bShutDown = false; }; // #include"SharedQueue.hpp" template <typename T> SharedQueue<T>::SharedQueue(){ } template <typename T> SharedQueue<T>::~SharedQueue(){ } template <typename T> bool SharedQueue<T>::is_shutdown() { return m_bShutDown; } template <typename T> void SharedQueue<T>::shut_down() { std::unique_lock<std::mutex> mlock(m_mutex); m_queue.clear(); m_bShutDown = true; } template <typename T> T SharedQueue<T>::pop() { std::unique_lock<std::mutex> mlock(m_mutex); // if (m_queue.empty()) { // return false; // } m_cond.wait(mlock, [this](){return !this.m_queue.empty();}); T rc(std::move(m_queue.front())); m_queue.pop_front(); return rc; } template <typename T> T& SharedQueue<T>::front() { std::unique_lock<std::mutex> mlock(m_mutex); while(m_queue.empty()) { m_cond.wait(mlock); } return m_queue.front(); } template <typename T> void SharedQueue<T>::pop_front() { std::unique_lock<std::mutex> mlock(m_mutex); while(m_queue.empty()) { m_cond.wait(mlock); } m_queue.pop_front(); } template <typename T> void SharedQueue<T>::push_back(const T& item) { std::unique_lock<std::mutex> mlock(m_mutex); m_queue.push_back(item); mlock.unlock(); m_cond.notify_one(); } template <typename T> void SharedQueue<T>::push_back(T&& item) { std::unique_lock<std::mutex> mlock(m_mutex); m_queue.push_back(std::move(item)); mlock.unlock(); m_cond.notify_one(); } template <typename T> int SharedQueue<T>::size() { std::unique_lock<std::mutex> mlock(m_mutex); int size = m_queue.size(); mlock.unlock(); return size; } template <typename T> bool SharedQueue<T>::empty() { std::unique_lock<std::mutex> mlock(m_mutex); bool is_empty = m_queue.empty(); mlock.unlock(); return is_empty; } #endif