1. 程式人生 > 其它 >C++ 安全佇列

C++ 安全佇列

所謂的安全佇列,就是封裝一個加條件變數和互斥鎖的一個佇列類,以便在多執行緒訪問該佇列資源時是並行操作

1、多個執行緒從一個佇列中讀取資料,加互斥鎖和條件變數

#ifndef SHAREDQUEUE_HPP
#define SHAREDQUEUE_HPP

#include<iostream>
#include<memory>
#include<queue>
#include<mutex>
#include<condition_variable>

template <typename T>
class SharedQueue 
{
public: SharedQueue(); ~SharedQueue(); bool pop(std::unique_ptr<T>&& pData); void push(std::unique_ptr<T>&& item); int size(); bool empty(); public: T operator[] (int k) { return m_queue[k]; } private: std::deque<std::unique_ptr<T>> m_queue; std::mutex m_mutex; std::condition_variable m_cond; }; template
<typename T> SharedQueue<T>::SharedQueue(){ } template <typename T> SharedQueue<T>::~SharedQueue(){ } template <typename T> bool SharedQueue<T>::pop(std::unique_ptr<T>&& pData) { std::unique_lock<std::mutex> mlock(m_mutex); if (m_queue.empty()) {
return false; } pData = std::move(m_queue.front()); m_queue.pop_front(); return true; } template <typename T> void SharedQueue<T>::push(std::unique_ptr<T>&& item) { std::unique_lock<std::mutex> mlock(m_mutex); m_queue.push_back(std::move(item)); mlock.unlock(); m_cond.notify_all(); } template <typename T> int SharedQueue<T>::size() { std::unique_lock<std::mutex> mlock(m_mutex); int size = m_queue.size(); mlock.unlock(); return size; } template <typename T> bool SharedQueue<T>::empty() { std::unique_lock<std::mutex> mlock(m_mutex); return m_queue.empty(); } #endif

執行結果:

Thread idx : 0
pData : 0
pData : 1
pData : 2
pData : 3
pData : 4
pData : 5
pData : 6
pData : 7
pData : 8
pData : 9
Thread idx : 1
Thread idx : 2
Thread idx : 3

2、如果多執行緒安全佇列是用在生產者和消費者場景中,即多個生產者(多個執行緒)往佇列裡寫資料,多個消費者(多個執行緒)往佇列中讀資料

#ifndef SHAREDQUEUE_HPP
#define SHAREDQUEUE_HPP

#include<iostream>
#include<memory>
#include<queue>
#include<mutex>
#include<condition_variable>

template <typename T>
class SharedQueue 
{
public:
    SharedQueue();
    ~SharedQueue();

    T& front();
    T pop();
    void pop_front();
    void push_back(const T& item);
    void push_back(T&& item);
    void shut_down();

    // bool pop(std::unique_ptr<T>&& pData);
    // void push(std::unique_ptr<T>&& item);
    int size();
    bool empty();
    bool is_shutdown();

public:
    T operator[] (int k) {
        return m_queue[k];
    }
private:
    std::deque<std::unique_ptr<T>> m_queue;
    std::mutex m_mutex;
    std::condition_variable m_cond;
    bool m_bShutDown = false;
};

// #include"SharedQueue.hpp"
template <typename T>
SharedQueue<T>::SharedQueue(){

}

template <typename T>
SharedQueue<T>::~SharedQueue(){

}

template <typename T>
bool SharedQueue<T>::is_shutdown() {
    return m_bShutDown;
}

template <typename T>
void SharedQueue<T>::shut_down() {
    std::unique_lock<std::mutex> mlock(m_mutex);
    m_queue.clear();
    m_bShutDown = true;
}

template <typename T>
T SharedQueue<T>::pop()
{
    std::unique_lock<std::mutex> mlock(m_mutex);
    // if (m_queue.empty()) {
    //     return false;
    // }

    m_cond.wait(mlock, [this](){return !this.m_queue.empty();});
    T rc(std::move(m_queue.front()));
    m_queue.pop_front();

    return rc;
}

template <typename T>
T& SharedQueue<T>::front()
{
    std::unique_lock<std::mutex> mlock(m_mutex);
    while(m_queue.empty()) {
        m_cond.wait(mlock);
    }    

    return m_queue.front();
}

template <typename T>
void SharedQueue<T>::pop_front()
{
    std::unique_lock<std::mutex> mlock(m_mutex);
    while(m_queue.empty()) {
        m_cond.wait(mlock);
    }

    m_queue.pop_front();
}

template <typename T>
void SharedQueue<T>::push_back(const T& item)
{
    std::unique_lock<std::mutex> mlock(m_mutex);
    m_queue.push_back(item);
    mlock.unlock();
    m_cond.notify_one();
}

template <typename T>
void SharedQueue<T>::push_back(T&& item)
{
    std::unique_lock<std::mutex> mlock(m_mutex);
    m_queue.push_back(std::move(item));
    mlock.unlock();
    m_cond.notify_one();  
}

template <typename T>
int SharedQueue<T>::size()
{
    std::unique_lock<std::mutex> mlock(m_mutex);
    int size = m_queue.size();
    mlock.unlock();
    return size;
}

template <typename T>
bool SharedQueue<T>::empty()
{
    std::unique_lock<std::mutex> mlock(m_mutex);
    bool is_empty = m_queue.empty();
    mlock.unlock();
    return is_empty;
}

#endif