muduo_base程式碼剖析之BlockingQueue、BoundedBlockingQueue
阿新 • • 發佈:2018-11-26
BlockingQueue.h 無界佇列
程式碼功能:使用條件變數+互斥鎖,實現無界佇列(即生產者消費者模型),保證對臨界資源(佇列)的訪問是安全的
BlockingQueue類的原始碼
template<typename T>
class BlockingQueue : noncopyable
{
private:
mutable MutexLock mutex_; //互斥鎖,保護共享的佇列queue_
Condition notEmpty_ GUARDED_BY(mutex_); //條件變數
std::deque<T> queue_ GUARDED_BY (mutex_); //無界緩衝區佇列queue_
public:
BlockingQueue(): mutex_(),notEmpty_(mutex_),queue_() {}
void put(const T& x)
{
MutexLockGuard lock(mutex_); //修改佇列前,先加鎖
queue_.push_back(x); //向佇列中新增元素
notEmpty_.notify(); //通知當前佇列不為空,take()方法不阻塞
}
void put(T&& x)
{
MutexLockGuard lock (mutex_);
queue_.push_back(std::move(x));
notEmpty_.notify();
}
T take() //取出對頭元素
{
MutexLockGuard lock(mutex_);
// always use a while-loop, due to spurious wakeup
while (queue_.empty()) //如果佇列是空
{
notEmpty_.wait(); //則一直等待,直到佇列不為空
}
assert(!queue_.empty()); //再次判斷佇列不為空
T front(std::move(queue_.front())); //取出佇列中的頭元素,賦值給front變數
queue_.pop_front(); //彈出front
return std::move(front); //返回值為取出的對頭元素front
}
size_t size() const //返回佇列的大小
{
MutexLockGuard lock(mutex_);
return queue_.size();
}
};
示例程式碼:BlockingQueue_test.cc
#include <muduo/base/BlockingQueue.h>
#include <muduo/base/CountDownLatch.h>
#include <muduo/base/Thread.h>
#include <memory>
#include <string>
#include <vector>
#include <stdio.h>
#include <unistd.h>
class Test
{
public:
Test(int numThreads): latch_(numThreads)
{
for (int i = 0; i < numThreads; ++i)
{
char name[32];
snprintf(name, sizeof name, "work thread %d", i);
//建立執行緒,繫結threadFunc執行緒函式
threads_.emplace_back(new muduo::Thread(std::bind(&Test::threadFunc, this), muduo::string(name)));
}
for (auto& thr : threads_)
{
thr->start(); //讓執行緒函式threadFunc跑起來
}
}
void run(int times) //只有主執行緒生產產品,主執行緒是生產者
{
printf("waiting for count down latch\n");
//1.主執行緒將一直阻塞在count_>0條件上,直到count_!>0
//2.每個子執行緒啟動後,都會呼叫countDown()函式將count_--
latch_.wait();
//3.當count減為0時,wait被喚醒,繼續執行下面的程式碼
printf("all threads started\n");
for (int i = 0; i < times; ++i)
{
char buf[32];
snprintf(buf, sizeof buf, "hello %d", i);
queue_.put(buf); //主執行緒向queue_中放元素
printf("tid=%d, put data = %s, size = %zd\n", muduo::CurrentThread::tid(), buf, queue_.size());
}
}
void joinAll()
{
for (size_t i = 0; i < threads_.size(); ++i)
{
queue_.put("stop");
}
for (auto& thr : threads_)
{
thr->join();
}
}
private:
void threadFunc()
{
printf("tid=%d, %s started\n",
muduo::CurrentThread::tid(),
muduo::CurrentThread::name());
latch_.countDown(); //每個執行緒都對latch_.count_--
bool running = true;
while (running)
{
//queue_.take():使用條件變數控制佇列,當佇列為空時,一直阻塞
std::string d(queue_.take());
printf("tid=%d, get data = %s, size = %zd\n", muduo::CurrentThread::tid(), d.c_str(), queue_.size());
running = (d != "stop");
}
printf("tid=%d, %s stopped\n",
muduo::CurrentThread::tid(),
muduo::CurrentThread::name());
}
muduo::BlockingQueue<std::string> queue_;
muduo::CountDownLatch latch_;
std::vector<std::unique_ptr<muduo::Thread>> threads_; // 執行緒容器
};
void testMove()
{
muduo::BlockingQueue<std::unique_ptr<int>> queue;
queue.put(std::unique_ptr<int>(new int(42)));
std::unique_ptr<int> x = queue.take();
printf("took %d\n", *x);
*x = 123;
queue.put(std::move(x));
std::unique_ptr<int> y = queue.take();
printf("took %d\n", *y);
}
int main()
{
Test t(5);
t.run(100);
t.joinAll();
testMove();
printf("number of created threads %d\n", muduo::Thread::numCreated());
}
BoundedBlockingQueue 有界環形佇列
有界緩衝區:與無界緩衝區相比,多了一個條件變數notFull成員,並且使用boost庫的環形緩衝區。
template<typename T>
class BoundedBlockingQueue : noncopyable
{
private:
mutable MutexLock mutex_;
Condition notEmpty_ ;
Condition notFull_ ;
boost::circular_buffer<T> queue_ ; // 環形佇列
public:
explicit BoundedBlockingQueue(int maxSize) //maxSize環形佇列容量
: mutex_(),notEmpty_(mutex_),notFull_(mutex_),queue_(maxSize)
{}
void put(const T& x)
{
MutexLockGuard lock(mutex_);
while (queue_.full())
{
notFull_.wait();
}
assert(!queue_.full());
queue_.push_back(x);
notEmpty_.notify();
}
T take()
{
MutexLockGuard lock(mutex_);
while (queue_.empty())
{
notEmpty_.wait();
}
assert(!queue_.empty());
T front(queue_.front());
queue_.pop_front();
notFull_.notify();
return front;
}
bool empty() const
{
MutexLockGuard lock(mutex_);
return queue_.empty();
}
bool full() const
{
MutexLockGuard lock(mutex_);
return queue_.full();
}
size_t size() const //有效元素個數
{
MutexLockGuard lock(mutex_);
return queue_.size();
}
size_t capacity() const //佇列容量
{
MutexLockGuard lock(mutex_);
return queue_.capacity();
}
};