C++並行程式設計(二): 利用C++標準庫實現Semaphore訊號量
在上一節中,我們使用C++11標準庫中的提供的條件變數以及互斥變數封裝實現了兩個仿Windows核心事件類:ManualResetEvent和AutoResetEvent。在這一節中,我們將繼續使用標準庫中提供的類來實現高仿訊號量Semaphore。而後文的程式碼中,有使用到上一節程式碼中定義的某些抽象基類,看不懂的讀者們可以查閱上一章《C++並行程式設計(一): 利用C++標準庫實現仿Windows核心事件物件》。
Semaphore訊號量機制在Windows和Linux中都有提供,作為作業系統最重要的一種執行緒間同步機制之一。在我個人的程式設計生涯中,Semaphore最多的使用場景是用於實現生產者-消費者佇列
下面的程式碼和上一節中的程式碼是同一個專案中的程式碼,並且引用到了上一節的程式碼,大家可以把它們合併到一起。廢話少說,上乾貨!(本文所有程式碼在VS2017下驗證通過)
Semaphore.h
和上一節的設計一樣,我們先定義一組無實現的抽象介面,抽象介面中提供靜態方法CreateSemaphore建立實體物件。
// Semaphore.h #pragma once #include "Event.h" namespace Threads { // 訊號量 class Semaphore : WaitHandle { public: // 無上限 static const int Unlimited = -1; // 建立訊號量 static unique_ptr<Semaphore> CreateSemaphore(int initialCount, int maxCount = Unlimited); public: // 等待一個訊號量 virtual void Wait() = 0; // 限時等待訊號量 virtual bool Wait(const milliseconds& timeout) = 0; // 釋放訊號量 virtual void Release() = 0; // 釋放訊號量指定次數 virtual void Release(int releaseCount) = 0; // 獲取剩餘訊號量數量. virtual int GetCurrentCount() const = 0; // 釋放訊號量物件 virtual void Close() = 0; }; }
SemaphoreImpl.h
接下來,我們定義實現類,原則是,使用者不應該直接使用這個類。
#pragma once
#include "Semaphore.h"
#include <mutex>
#include <atomic>
#include <condition_variable>
namespace Threads
{
// 訊號量實現類
class SemaphoreImpl : public Semaphore
{
mutex m_mutex;
condition_variable m_cond;
atomic<bool> m_bDisposed;
volatile int m_currentCount;
int m_maxCount;
public:
SemaphoreImpl(int initialCount, int maxCount);
// 等待一個訊號量
virtual void Wait();
// 限時等待訊號量
virtual bool Wait(const milliseconds& timeout);
// 釋放訊號量
virtual void Release();
// 釋放訊號量指定次數
virtual void Release(int releaseCount);
// 獲取剩餘訊號量數量.
virtual int GetCurrentCount() const;
// 釋放訊號量物件
virtual void Close();
private:
void CheckDisposed()const;
};
}
Semaphore.cpp
這個抽象類中唯一需要實現是靜態方法CreateSemaphore,用以建立實體物件,並返回其唯一引用。
#include "Semaphore.h"
#include "SemaphoreImpl.h"
using namespace Threads;
unique_ptr<Semaphore> Semaphore::CreateSemaphore(int initialCount, int maxCount)
{
if (initialCount < 0)
{
throw out_of_range("initialCount < 0");
}
if (maxCount != Unlimited && (maxCount < initialCount || maxCount < 1))
{
throw out_of_range("maxCount < initialCount || maxCount < 1");
}
return unique_ptr<Semaphore>(new SemaphoreImpl(initialCount, maxCount));
}
SemaphoreImpl.cpp
乾貨滿滿的時間到了!
// SemaphoreImpl.cpp
#include "SemaphoreImpl.h"
using namespace Threads;
SemaphoreImpl::SemaphoreImpl(int initialCount, int maxCount)
:m_currentCount(initialCount), m_maxCount(maxCount),
m_bDisposed(false)
{
}
void Threads::SemaphoreImpl::Wait()
{
unique_lock<mutex> lock(m_mutex);
m_cond.wait(lock, [this] {return GetCurrentCount() > 0; });
--m_currentCount;
}
bool Threads::SemaphoreImpl::Wait(const milliseconds & timeout)
{
unique_lock<mutex> lock(m_mutex);
if (m_cond.wait_for(lock, timeout, [this] {return GetCurrentCount() > 0; }))
{
--m_currentCount;
return true;
}
return false;
}
void Threads::SemaphoreImpl::Release()
{
Release(1);
}
void Threads::SemaphoreImpl::Release(int releaseCount)
{
if (releaseCount < 1)
{
throw invalid_argument("releaseCount < 1");
}
lock_guard<mutex> lock(m_mutex);
CheckDisposed();
if (m_maxCount != Semaphore::Unlimited && GetCurrentCount() + releaseCount > m_maxCount)
{
throw invalid_argument("GetCurrentCount() + releaseCount > m_maxCount");
}
m_currentCount += releaseCount;
do {
m_cond.notify_one();
} while (--releaseCount);
}
int Threads::SemaphoreImpl::GetCurrentCount() const
{
CheckDisposed();
return m_currentCount;
}
void SemaphoreImpl::Close()
{
auto expected = false;
if (!m_bDisposed.compare_exchange_weak(expected, true))
{
return;
}
lock_guard<mutex> lock(m_mutex);
m_cond.notify_all();
}
void Threads::SemaphoreImpl::CheckDisposed() const
{
if (m_bDisposed.load() == true)
{
throw logic_error("SemaphoreImpl Disposed.");
}
}
//==================== 華麗的分割線 ====================
至此,一個高仿的Semaphore就已經實現完畢了,讀者們看懂的嗎?接下來,就該上示例Demo了:
main.cpp
#include <cstdio>
#include <stdlib.h>
#include <iostream>
#include <memory>
#include <thread>
#include "Semaphore.h"
using namespace std;
using namespace Threads;
using namespace chrono;
int main()
{
auto sem = Semaphore::CreateSemaphore(0);
thread t1([&] {
try
{
while (true)
{
// 等待訊號量
sem->Wait();
cout << "[Thread1] Wake up!" << endl;
}
}
catch (const std::logic_error& e) // 訊號量被銷燬
{
cerr << "[Thread1] : " << e.what() << endl;
}
});
thread t2([&] {
try
{
while (true)
{
if (!sem->Wait(duration_cast<milliseconds>(seconds(5))))
{
cout << "[Thread2] Time out!" << endl;
continue;
}
cout << "[Thread2] Wake up!" << endl;
}
}
catch (const std::logic_error& e) // 訊號量被銷燬
{
cerr << "[Thread2] : " << e.what() << endl;
}
});
thread t3([&] {
try
{
while (true)
{
sem->Wait();
cout << "[Thread3] Wake up!" << endl;
}
}
catch (const std::logic_error& e) // 訊號量被銷燬
{
cerr << "[Thread3] : " << e.what() << endl;
}
});
while (true)
{
int input = 0;
cout << "Please input an integer(-1 to exit): " << endl;
cin >> input;
if (input < 0)
{
sem->Close();
break;
}
sem->Release(max(input % 5, 1));
}
t1.join();
t2.join();
t3.join();
return 0;
}
上述示例演示了訊號量的等待、限時等待以及釋放中斷等幾種情況,並且在Windows和Linux下測試通過。