Boost.Interprocess使用手冊翻譯之六:同步機制(Synchronization mechanisms)
六. 同步機制
同步機制概述 互斥量 條件變數 訊號量 升級互斥量 通過移動語義轉移鎖 檔案鎖 訊息佇列 |
同步機制概述
具名和匿名同步機制 同步機制型別 |
如前所述,如果對記憶體的訪問不能有效的同步,則通過記憶體對映檔案或共享記憶體物件在程序間共享記憶體的能力就不是非常有用了。與需要在程序間共享堆疊和全域性變數的程序間同步機制遇到的問題一樣,訪問這些資源一般需要使用互斥量或條件變數進行同步。Boost.Threads
具名和匿名同步機制
Boost.Interprocess提供了兩種同步物件:
- 具名實用工具:若兩個程序想建立此種類型的物件,它們必須使用相同的名字建立一個物件。這與建立或開啟檔案類似:一個程序使用採用filename的fstream建立一個檔案,另一個程序使用另一個採用同樣的名字為引數的fstream開啟檔案。雖然每個程序都使用一個不同的物件來訪問資源,但這些程序均使用相同的底層資源。
- 匿名實用工具:因為這些實用工具沒有名字,因此兩個程序必須通過共享記憶體或記憶體對映檔案共享同一個物件
以上兩種方式各有優缺點:
- 具名實用工具在處理簡單的同步任務時要簡單些,因為程序不需要建立共享記憶體區域以及構建同步機制。
- 當使用記憶體對映物件獲得同步工具的自動持久化屬性時,匿名實用工具可以被序列化至磁碟。你可以在記憶體對映檔案上構建一個同步工具,重啟系統,再次對映此檔案,從而再次使用此同步化工具。這在具名實用工具的方式下是不行的。
具名和匿名實用工具在介面上的不同主要反映在建構函式上。一般而言,匿名實用工具僅有一個建構函式,而具名實用工具有多個建構函式,它們的第一個引數是一個需要建立或開啟底層資源的特殊型別:
using namespace boost::interprocess;
//Create the synchronization utility. If it previously
//exists, throws an error
NamedUtility(create_only, ...)
//Open the synchronization utility. If it does not previously
//exist, it's created.
NamedUtility(open_or_create, ...)
//Open the synchronization utility. If it does not previously
//exist, throws an error.
NamedUtility(open_only, ...)
另一方面,匿名同步實用工具僅能被建立,並且程序間必須使用建立此實用工具的其他同步機制進行同步:
using namespace boost::interprocess;
//Create the synchronization utility.
AnonymousUtility(...)
同步機制型別
除了其具名/匿名特性,Boost.Interprocess提供了一下同步實用工具:
- 互斥量(具名/匿名)
- 條件變數(具名/匿名)
- 訊號量(具名/匿名)
- 升級互斥型別
- 檔案鎖
互斥量
什麼是一個互斥量 互斥量操作 Boost.Interprocess互斥量型別和標頭檔案 區域性鎖 匿名互斥量示例 具名互斥量示例 |
什麼是一個互斥量
互斥量代表相互排斥並且它是程序間不同的最基本形式。互斥量保證僅有一個執行緒能夠鎖定一個給定的互斥量。如果一個程式碼片段被一個互斥量鎖定或解鎖,它保證一次僅有一個執行緒執行此段程式碼。當這個執行緒解鎖了互斥量,另一個執行緒方可進入那段程式碼區域:
//The mutex has been previously constructed
lock_the_mutex();
//This code will be executed only by one thread
//at a time.
unlock_the_mutex();
一個互斥量可以是遞迴或非遞迴的:
- 遞迴互斥量可以被同一個執行緒鎖多次。為了完全解鎖互斥量,此執行緒必須解鎖同樣的次數。
- 非遞迴互斥量不能被同一個執行緒鎖多次。如果一個互斥量被一個執行緒鎖了兩次,結果是未定義的,可能會丟擲一個錯誤或是執行緒被死鎖。
互斥量操作
所有Boost.Interprocess的互斥量型別均有如下操作:
void lock()
效果:呼叫執行緒嘗試獲取互斥量的所有權,並且如果其他執行緒已經擁有了此互斥量的所有權,它等待直到能獲得所有權為止。如果一個執行緒擁有了此互斥量的所有權,此互斥量必須被同一個執行緒解鎖。如果互斥量支援遞迴鎖定,則互斥量必須解鎖同樣的次數。
丟擲:interprocess_exception錯誤。
bool try_lock()
效果:呼叫執行緒嘗試獲取互斥量的所有權,並且如果其他執行緒已經擁有了此互斥量的所有權,函式立刻返回。如果互斥量支援遞迴鎖定,則互斥量必須解鎖同樣的次數。
返回:如果執行緒獲取了互斥量的所有權,返回true;如果其他執行緒已經擁有了此互斥量的所有權,返回false。
丟擲:interprocess_exception錯誤。
booltimed_lock(const boost::posix_time::ptime &abs_time)
效果:呼叫執行緒嘗試獲取互斥量的獨佔權限直到達到指定的時間。如果互斥量支援遞迴鎖定,則互斥量必須解鎖同樣的次數。
返回:如果執行緒獲取了互斥量的所有權,返回true,如果超時,返回false。
丟擲:interprocess_exception錯誤。
void unlock()
前提條件:執行緒必須已經擁有了互斥量的獨佔權。
效果:呼叫執行緒釋放互斥量的獨佔權。如果互斥量支援遞迴鎖定,則互斥量必須解鎖同樣的次數。
丟擲:一個派生自interprocess_exception的異常。
Boost.Interprocess互斥量型別和標頭檔案
Boost.Interprocess提供瞭如下互斥量型別:
#include <boost/interprocess/sync/interprocess_mutex.hpp>
- interprocess_mutex: 一個非遞迴的、匿名的互斥量,它能夠被置於共享記憶體和記憶體對映檔案中。
#include <boost/interprocess/sync/interprocess_recursive_mutex.hpp>
- interprocess_recursive_mutex:一個遞迴的、匿名的互斥量,它能夠被置於共享記憶體和記憶體對映檔案中。
#include <boost/interprocess/sync/named_mutex.hpp>
- named_mutex:一個非遞迴的、具名的互斥量。
#include <boost/interprocess/sync/named_recursive_mutex.hpp>
- named_recursive_mutex:一個遞迴的、具名的互斥量。
區域性鎖
當程序已經完成了讀寫資料後及時解鎖互斥量,這是非常重要的。這一點在遇到異常時,是比較困難的。因此,互斥量一般是帶區域性鎖使用,區域性鎖能保證互斥量一直能被解鎖,哪怕有異常發生。為使用一個區域性鎖,僅需包含:
#include <boost/interprocess/sync/scoped_lock.hpp>
基本上,一個區域性鎖在其解構函式中呼叫 unlock(),因此一個互斥量在異常發生時,總能被解鎖。區域性鎖有許多建構函式用於lock, try_lock, timed_lock一個互斥量或者不鎖定。
using namespace boost::interprocess;
//Let's create any mutex type:
MutexType mutex;
{
//This will lock the mutex
scoped_lock<MutexType> lock(mutex);
//Some code
//The mutex will be unlocked here
}
{
//This will try_lock the mutex
scoped_lock<MutexType> lock(mutex, try_to_lock);
//Check if the mutex has been successfully locked
if(lock){
//Some code
}
//If the mutex was locked it will be unlocked
}
{
boost::posix_time::ptime abs_time = ...
//This will timed_lock the mutex
scoped_lock<MutexType> lock(mutex, abs_time);
//Check if the mutex has been successfully locked
if(lock){
//Some code
}
//If the mutex was locked it will be unlocked
}
更多詳情,參考scoped_lock'sreference.
匿名互斥量示例
想象一下兩個程序需要寫入跟蹤資料至一個建立在共享記憶體上的迴圈緩衝區。每個程序需要獲得迴圈緩衝區的獨佔訪問,寫跟蹤資料然後繼續。
為了保護迴圈緩衝區,我們可以儲存一個程序共享互斥量在迴圈緩衝區上。每個程序將在寫資料前鎖此互斥量,並且在結束寫跟蹤資料後將寫一個標記(標頭檔案doc_anonymous_mutex_shared_data.hpp):
#include <boost/interprocess/sync/interprocess_mutex.hpp>
struct shared_memory_log
{
enum { NumItems = 100 };
enum { LineSize = 100 };
shared_memory_log()
: current_line(0)
, end_a(false)
, end_b(false)
{}
//Mutex to protect access to the queue
boost::interprocess::interprocess_mutex mutex;
//Items to fill
char items[NumItems][LineSize];
int current_line;
bool end_a;
bool end_b;
};
這是程序的主要過程。建立共享記憶體,構建迴圈緩衝區,然後開始寫跟蹤資料:
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include "doc_anonymous_mutex_shared_data.hpp"
#include <iostream>
#include <cstdio>
using namespace boost::interprocess;
int main ()
{
try{
//Remove shared memory on construction and destruction
struct shm_remove
{
shm_remove() { shared_memory_object::remove("MySharedMemory"); }
~shm_remove(){ shared_memory_object::remove("MySharedMemory"); }
} remover;
//Create a shared memory object.
shared_memory_object shm
(create_only //only create
,"MySharedMemory" //name
,read_write //read-write mode
);
//Set size
shm.truncate(sizeof(shared_memory_log));
//Map the whole shared memory in this process
mapped_region region
(shm //What to map
,read_write //Map it as read-write
);
//Get the address of the mapped region
void * addr = region.get_address();
//Construct the shared structure in memory
shared_memory_log * data = new (addr) shared_memory_log;
//Write some logs
for(int i = 0; i < shared_memory_log::NumItems; ++i){
//Lock the mutex
scoped_lock<interprocess_mutex> lock(data->mutex);
std::sprintf(data->items[(data->current_line++) % shared_memory_log::NumItems]
,"%s_%d", "process_a", i);
if(i == (shared_memory_log::NumItems-1))
data->end_a = true;
//Mutex is released here
}
//Wait until the other process ends
while(1){
scoped_lock<interprocess_mutex> lock(data->mutex);
if(data->end_b)
break;
}
}
catch(interprocess_exception &ex){
std::cout << ex.what() << std::endl;
return 1;
}
return 0;
}
第二個程序開啟共享記憶體,獲取對迴圈緩衝區的訪問,然後開始寫跟蹤資料:
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include "doc_anonymous_mutex_shared_data.hpp"
#include <iostream>
#include <cstdio>
using namespace boost::interprocess;
int main ()
{
//Remove shared memory on destruction
struct shm_remove
{
~shm_remove(){ shared_memory_object::remove("MySharedMemory"); }
} remover;
//Open the shared memory object.
shared_memory_object shm
(open_only //only create
,"MySharedMemory" //name
,read_write //read-write mode
);
//Map the whole shared memory in this process
mapped_region region
(shm //What to map
,read_write //Map it as read-write
);
//Get the address of the mapped region
void * addr = region.get_address();
//Construct the shared structure in memory
shared_memory_log * data = static_cast<shared_memory_log*>(addr);
//Write some logs
for(int i = 0; i < 100; ++i){
//Lock the mutex
scoped_lock<interprocess_mutex> lock(data->mutex);
std::sprintf(data->items[(data->current_line++) % shared_memory_log::NumItems]
,"%s_%d", "process_a", i);
if(i == (shared_memory_log::NumItems-1))
data->end_b = true;
//Mutex is released here
}
//Wait until the other process ends
while(1){
scoped_lock<interprocess_mutex> lock(data->mutex);
if(data->end_a)
break;
}
return 0;
}
如我們所看到的,一個互斥量可用於保護資料,但是沒有向另一個程序發事件通知。基於此,我們需要一個條件變數,我們將在下一節闡述它。
具名互斥量示例
現在想象一個兩個程序想寫一份跟蹤資料至一個檔案中。首先,它們寫它們的名字,然後它們寫此訊息。因為作業系統能夠在任意時刻中斷一個程序,我們可能會在兩個程序中混淆此訊息的部分,因此我們需要一種原子寫整個訊息至檔案中的方法。為達此目的,我們可以使用一個具名互斥量以便每個程序在寫資料前鎖定此互斥量:
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <fstream>
#include <iostream>
#include <cstdio>
int main ()
{
using namespace boost::interprocess;
try{
struct file_remove
{
file_remove() { std::remove("file_name"); }
~file_remove(){ std::remove("file_name"); }
} file_remover;
struct mutex_remove
{
mutex_remove() { named_mutex::remove("fstream_named_mutex"); }
~mutex_remove(){ named_mutex::remove("fstream_named_mutex"); }
} remover;
//Open or create the named mutex
named_mutex mutex(open_or_create, "fstream_named_mutex");
std::ofstream file("file_name");
for(int i = 0; i < 10; ++i){
//Do some operations...
//Write to file atomically
scoped_lock<named_mutex> lock(mutex);
file << "Process name, ";
file << "This is iteration #" << i;
file << std::endl;
}
}
catch(interprocess_exception &ex){
std::cout << ex.what() << std::endl;
return 1;
}
return 0;
}
條件變數
什麼是一個條件變數 Boost.Interprocess條件變數型別和標頭檔案 匿名條件變數示例 |
什麼是一個條件變數
在之前的例子中,互斥量被用來鎖定,但直到條件變量出現前,我們並不能有效使用它。一個條件變數可以做兩件事情:
- 等:執行緒被阻塞直到另一個執行緒通知它繼續,因為導致等待的條件已經消失了。
- 通知:執行緒傳送一個訊號至一個阻塞的執行緒或者至所有阻塞的執行緒,告訴它們導致它們等待的條件已經消失了。
在條件變數中等待總是會關聯一個互斥量。在等待條件之前,互斥量需要預先被鎖定。當在條件變數中等待時,執行緒解鎖互斥量然後原子等待。
當執行緒從等待函式(例如可能由於一個訊號或超時)返回後,互斥量物件再次被鎖定。
Boost.Interprocess條件變數型別和標頭檔案
Boost.Interprocess提供如下的條件型別:
#include <boost/interprocess/sync/interprocess_condition.hpp>
- interprocess_condition:一個匿名條件變數,它位於共享記憶體或記憶體對映空間被用於boost::interprocess::interprocess_mutex。
#include <boost/interprocess/sync/named_condition.hpp>
- named_condition:一個具名條件變數被用於named_mutex。
具名條件與匿名條件類似,但它們結合具名互斥量使用。有時,我們不希望儲存帶同步資料的同步物件:
- 我們想採用同樣的資料改變同步方式(從程序間變為程序內,或不適用任何同步方式)。儲存帶共享資料的程序間共享匿名同步物件將禁止這樣做。
- 我們想通過網路或其他通訊方式傳送同步資料。傳送程序共享的同步物件沒有任何意義。
匿名條件變數示例
想象一下,一個程序寫一份跟蹤資料至簡單共享記憶體緩衝區,另一個程序一個接一個打印出來。第一個程序寫跟蹤資料,然後等待直到另一個程序打印出這份資料。為達到此目的,我們可以使用兩個條件變數:第一個用於阻塞傳送者直到第二個程序打印出此訊息,第二個用於阻塞接收者直到緩衝區中有資料供列印。
共享記憶體跟蹤資料緩衝區(doc_anonymous_condition_shared_data.hpp):
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
struct trace_queue
{
enum { LineSize = 100 };
trace_queue()
: message_in(false)
{}
//Mutex to protect access to the queue
boost::interprocess::interprocess_mutex mutex;
//Condition to wait when the queue is empty
boost::interprocess::interprocess_condition cond_empty;
//Condition to wait when the queue is full
boost::interprocess::interprocess_condition cond_full;
//Items to fill
char items[LineSize];
//Is there any message
bool message_in;
};
這是程序的主要過程。建立共享記憶體、放置緩衝區在其上然後開始一個接一個寫訊息直到它寫了一個"last message"表明沒有更多訊息供列印:
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <iostream>
#include <cstdio>
#include "doc_anonymous_condition_shared_data.hpp"
using namespace boost::interprocess;
int main ()
{
//Erase previous shared memory and schedule erasure on exit
struct shm_remove
{
shm_remove() { shared_memory_object::remove("MySharedMemory"); }
~shm_remove(){ shared_memory_object::remove("MySharedMemory"); }
} remover;
//Create a shared memory object.
shared_memory_object shm
(create_only //only create
,"MySharedMemory" //name
,read_write //read-write mode
);
try{
//Set size
shm.truncate(sizeof(trace_queue));
//Map the whole shared memory in this process
mapped_region region
(shm //What to map
,read_write //Map it as read-write
);
//Get the address of the mapped region
void * addr = region.get_address();
//Construct the shared structure in memory
trace_queue * data = new (addr) trace_queue;
const int NumMsg = 100;
for(int i = 0; i < NumMsg; ++i){
scoped_lock<interprocess_mutex> lock(data->mutex);
if(data->message_in){
data->cond_full.wait(lock);
}
if(i == (NumMsg-1))
std::sprintf(data->items, "%s", "last message");
else
std::sprintf(data->items, "%s_%d", "my_trace", i);
//Notify to the other process that there is a message
data->cond_empty.notify_one();
//Mark message buffer as full
data->message_in = true;
}
}
catch(interprocess_exception &ex){
std::cout << ex.what() << std::endl;
return 1;
}
return 0;
}
第二個程序開啟共享記憶體,然後列印每個訊息直到 "last message" 訊息:
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <iostream>
#include <cstring>
#include "doc_anonymous_condition_shared_data.hpp"
using namespace boost::interprocess;
int main ()
{
//Create a shared memory object.
shared_memory_object shm
(open_only //only create
,"MySharedMemory" //name
,read_write //read-write mode
);
try{
//Map the whole shared memory in this process
mapped_region region
(shm //What to map
,read_write //Map it as read-write
);
//Get the address of the mapped region
void * addr = region.get_address();
//Obtain a pointer to the shared structure
trace_queue * data = static_cast<trace_queue*>(addr);
//Print messages until the other process marks the end
bool end_loop = false;
do{
scoped_lock<interprocess_mutex> lock(data->mutex);
if(!data->message_in){
data->cond_empty.wait(lock);
}
if(std::strcmp(data->items, "last message") == 0){
end_loop = true;
}
else{
//Print the message
std::cout << data->items << std::endl;
//Notify the other process that the buffer is empty
data->message_in = false;
data->cond_full.notify_one();
}
}
while(!end_loop);
}
catch(interprocess_exception &ex){
std::cout << ex.what() << std::endl;
return 1;
}
return 0;
}
採用條件變數,如果一個執行緒不能繼續工作,它能夠阻塞,並且當遇到可以繼續的條件時,另一個執行緒能夠喚醒它。
訊號量
什麼是一個訊號量 Boost.Interprocess訊號量型別和標頭檔案 匿名訊號量示例 |
什麼是一個訊號量
訊號量是一種基於內部計數的程序間同步機制,它提供了兩種基本操作:
- 等待:測試訊號量計數值,如果值小於等於0,則等待。否則,減1訊號量計數值。
- 委派(Post):增1訊號量計數值。如果有程序被阻塞了,則其中之一被喚醒。
如果訊號量計數值被初始化為1,則等待操作與互斥鎖等價,委派操作與互斥解鎖等價。這種型別的訊號量被稱為二進位制訊號量。
儘管訊號量可以像互斥量一樣使用,它有一個獨特的特性:與互斥量不同,委派操作不需要由執行等待操作的那個相同的執行緒/程序來執行。
Boost.Interprocess訊號量型別和標頭檔案
Boost.Interprocess提供如下型別的訊號量:
#include <boost/interprocess/sync/interprocess_semaphore.hpp>
- interprocess_semaphore: 一種匿名訊號量,它能被置於共享記憶體或記憶體對映檔案中。
#include <boost/interprocess/sync/named_semaphore.hpp>
- named_semaphore:一種具名訊號量。
匿名訊號量示例
我們將在共享記憶體中實現一個整型陣列,此陣列將被用來從一個程序向另一個程序傳遞資料。第一個程序將向陣列內寫一些整數,並且如果陣列滿了,程序將阻塞。
第二個程序將拷貝傳輸過來的資料至其自身的buffer中,如果buffer中沒有新資料了,則此程序阻塞。
以下是共享整型陣列(doc_anonymous_semaphore_shared_data.hpp):
#include <boost/interprocess/sync/interprocess_semaphore.hpp>
struct shared_memory_buffer
{
enum { NumItems = 10 };
shared_memory_buffer()
: mutex(1), nempty(NumItems), nstored(0)
{}
//Semaphores to protect and synchronize access
boost::interprocess::interprocess_semaphore
mutex, nempty, nstored;
//Items to fill
int items[NumItems];
};
以下是主程序。建立共享記憶體、放置整型陣列至共享記憶體上,然後開始一個個填充整數,如果陣列滿了,則阻塞。
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <iostream>
#include "doc_anonymous_semaphore_shared_data.hpp"
using namespace boost::interprocess;
int main ()
{
//Remove shared memory on construction and destruction
struct shm_remove
{
shm_remove() { shared_memory_object::remove("MySharedMemory"); }
~shm_remove(){ shared_memory_object::remove("MySharedMemory"); }
} remover;
//Create a shared memory object.
shared_memory_object shm
(create_only //only create
,"MySharedMemory" //name
,read_write //read-write mode
);
//Set size
shm.truncate(sizeof(shared_memory_buffer));
//Map the whole shared memory in this process
mapped_region region
(shm //What to map
,read_write //Map it as read-write
);
//Get the address of the mapped region
void * addr = region.get_address();
//Construct the shared structure in memory
shared_memory_buffer * data = new (addr) shared_memory_buffer;
const int NumMsg = 100;
//Insert data in the array
for(int i = 0; i < NumMsg; ++i){
data->nempty.wait();
data->mutex.wait();
data->items[i % shared_memory_buffer::NumItems] = i;
data->mutex.post();
data->nstored.post();
}
return 0;
}
第二個程序開啟共享記憶體,然後拷貝接收到的整數至其自身的共享記憶體:
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <iostream>
#include "doc_anonymous_semaphore_shared_data.hpp"
using namespace boost::interprocess;
int main ()
{
//Remove shared memory on destruction
struct shm_remove
{
~shm_remove(){ shared_memory_object::remove("MySharedMemory"); }
} remover;
//Create a shared memory object.
shared_memory_object shm
(open_only //only create
,"MySharedMemory" //name
,read_write //read-write mode
);
//Map the whole shared memory in this process
mapped_region region
(shm //What to map
,read_write //Map it as read-write
);
//Get the address of the mapped region
void * addr = region.get_address();
//Obtain the shared structure
shared_memory_buffer * data = static_cast<shared_memory_buffer*>(addr);
const int NumMsg = 100;
int extracted_data [NumMsg];
//Extract the data
for(int i = 0; i < NumMsg; ++i){
data->nstored.wait();
data->mutex.wait();
extracted_data[i] = data->items[i % shared_memory_buffer::NumItems];
data->mutex.post();
data->nempty.post();
}
return 0;
}
同樣的程序間通訊也可以採用條件變數和互斥量來完成,但這幾個同步模式中,訊號量比互斥量/條件變數組合更高效。
升級互斥量
什麼是一個升級互斥量 升級互斥量的操作 Boost.Interprocess升級互斥量型別和標頭檔案 共享鎖和升級鎖 |
什麼是一個升級互斥量
升級互斥量是一種特殊的互斥量,它比普通的互斥量提供了更多的鎖定可能性。有時,我們能夠區分讀資料和修改資料。如果僅有某些執行緒需要修改資料,並且一個普通的互斥量被用來保護資料被並行訪問,併發性是相當有限的:兩個僅讀取資料的執行緒會被序列化,而不是被併發執行。
如果我們允許對僅讀取資料的執行緒的併發訪問,但是我們避免對讀/寫執行緒的併發訪問,我們就能夠提升效能。這在如下應用中尤其明顯:當讀資料比修改資料更頻繁並且同步資料讀取的程式碼需要一定的時間來執行。採用升級互斥量,我們能夠獲得3種鎖型別:
- 獨佔鎖:與普通互斥量類似。如果一個執行緒獲取了一個獨佔鎖,其他任何執行緒都不能獲取此鎖(互斥或其他方式)直到解鎖。如果某一執行緒有一個共享或升級鎖,則另一個企圖獲取獨佔鎖的執行緒會阻塞。這種鎖將被會修改資料的執行緒所擁有。
- 共享鎖:如果一個執行緒獲取了一個共享鎖,其他執行緒也可以獲取一個共享鎖或升級鎖。如果任一執行緒獲取了獨佔鎖,則另一企圖獲取共享鎖的執行緒會被阻塞。這種鎖被僅需要讀取資料的執行緒執行。
- 升級鎖:獲取一個升級鎖與獲取一個特權共享鎖類似。如果一個程序獲取了一個升級鎖,另外的執行緒可以獲取一個共享鎖。如果任一執行緒已經獲取了獨佔鎖或升級鎖,則其他企圖獲取升級鎖的執行緒會被阻塞。當其他已經獲取了共享鎖的執行緒釋放了此鎖,一個已經獲取一個升級鎖的執行緒被保證能夠原子獲取一個獨佔鎖。它被用於如下執行緒中:此執行緒可能需要修改資料,但通常情況下它僅需要讀取資料。這種執行緒獲取升級鎖,其他執行緒能獲取共享鎖。如果升級的執行緒讀資料並且它需要修改資料時,此執行緒能夠被提升至獲取獨佔鎖:當所有共享執行緒已經釋放了共享鎖時,升級鎖被原子提升至獨佔鎖。這個新被提升的執行緒能夠修改資料,並且能夠保證在過渡時,沒有其他執行緒對資料修改。僅僅一個執行緒能獲取升級(特權讀)鎖。
總結一下:
表 12.5. 鎖定的可能性
如果一個執行緒已經獲取… |
其他執行緒能夠獲取… |
共享鎖 |
許多共享鎖和一個升級鎖 |
升級鎖 |
許多共享鎖 |
獨佔鎖 |
無 |
一個已經獲取了一個鎖的執行緒能夠嘗試原子獲取另一型別的鎖。所有鎖的轉換不保證能成功。儘管一個轉換能保證成功,一些轉換將阻塞執行緒直到其他執行緒釋放了共享鎖。原子操作意味著沒有其他執行緒在轉換中能夠獲得一個升級或獨佔鎖,因此資料能夠保證未被修改:
表 12.6. 轉換的可能性
如果一個執行緒已經獲取… |
它能原子釋放之前的鎖,並且… |
共享鎖 |
嘗試立刻獲取(不保證成功)獨佔鎖,如果沒有其他執行緒佔有互斥或升級鎖 |
共享鎖 |
嘗試立刻獲取(不保證成功)升級鎖,如果沒有其他執行緒佔有互斥或升級鎖 |
升級鎖 |
當所有共享鎖被釋放後,獲取獨佔鎖 |
升級鎖 |
立刻獲取共享鎖 |
獨佔鎖 |
立刻獲取升級鎖 |
獨佔鎖 |
立刻獲取共享鎖 |
正如我們所看到的,升級互斥量是一個強大的同步工具,它能夠提升併發效能。然後,如果大多數情況下我們都需要修改資料,或同步程式碼片段非常短,則使用普通互斥量效率更高,因為它具有較小的開銷。當同步程式碼片段比較大並且讀比寫多時,升級鎖才能發揮出它的光芒。
升級互斥量的操作
獨佔鎖定 共享鎖定 升級鎖定 降級 升級 |
所有Boost.Interprocess中的升級互斥型別均可以執行如下操作:
獨佔鎖定
void lock()
效果:呼叫者執行緒嘗試獲取互斥量的獨佔所有權,如果另一個執行緒已經有這個互斥量的獨佔、共享或升級所有權,則執行緒等待直到它獲取所有權為止。
丟擲:interprocess_exception異常。
bool try_lock()
效果:呼叫者執行緒嘗試獲取互斥量的獨佔所有權而不等待。如果沒有其他執行緒具有此互斥量的獨佔、共享或升級所有權,則獲取成功。
返回:如果能立即獲得獨佔所有權,則返回true。如果需要等待,返回false。
丟擲:interprocess_exception異常。
bool timed_lock(constboost::posix_time::ptime &abs_time)
效果:呼叫者執行緒嘗試獲取互斥量的獨佔所有權直到沒有其他執行緒具有此互斥量的獨佔、共享或升級所有權或者是達到abs_time。
返回:如果獲得獨佔所有權,則返回true。如果需要等待,返回false。
丟擲:interprocess_exception異常。
void unlock()
前提條件:執行緒必須有互斥量的獨佔所有權。
效果:呼叫者執行緒釋放互斥量的獨佔所有權。
丟擲:一個派生自interprocess_exception的異常。
共享鎖定
void lock_sharable()
效果:呼叫者執行緒嘗試獲取互斥量的共享所有權,並且如果另一個執行緒已經有這個互斥量的獨佔或升級所有權,則執行緒等待直到它獲取所有權為止。
丟擲:interprocess_exception異常。
bool try_lock_sharable()
效果:呼叫者執行緒嘗試獲取互斥量的共享所有權而不等待。如果沒有其他執行緒具有此互斥量的獨佔或升級所有權,則獲取成功。
返回:如果能立即獲得獨佔所有權,則返回true。如果需要等待,返回false。
丟擲:interprocess_exception異常。
bool timed_lock_sharable(constboost::posix_time::ptime &abs_time)
效果:呼叫者執行緒嘗試獲取互斥量的共享所有權直到沒有其他執行緒具有此互斥量的獨佔或升級所有權或者是達到abs_time。
返回:如果獲得共享所有權,則返回true。否則返回false。
丟擲:interprocess_exception異常。
void unlock_sharable()
前提條件:執行緒必須有互斥量的共享所有權。
效果:呼叫者執行緒釋放互斥量的共享所有權。
丟擲:一個派生自interprocess_exception的異常。
升級鎖定
void lock_upgradable()
效果:呼叫者執行緒嘗試獲取互斥量的升級所有權,並且如果另一個執行緒已經有這個互斥量的獨佔或升級所有權,則執行緒等待直到它獲取所有權為止。
丟擲:interprocess_exception異常。
bool try_lock_upgradable ()
效果:呼叫者執行緒嘗試獲取互斥量的升級所有權而不等待。如果沒有其他執行緒具有此互斥量的獨佔或升級所有權,則獲取成功。
返回:如果能立即獲得升級所有權,則返回true。如果需要等待,返回false。
丟擲:interprocess_exception異常。
booltimed_lock_upgradable(const boost::posix_time::ptime &abs_time)
效果:呼叫者執行緒嘗試獲取互斥量的升級所有權直到沒有其他執行緒具有此互斥量的獨佔或升級所有權或者是達到abs_time。
返回:如果獲得升級所有權,則返回true。否則返回false。
丟擲:interprocess_exception異常。
void unlock_upgradable()
前提條件:執行緒必須有互斥量的升級所有權。
效果:呼叫者執行緒釋放互斥量的升級所有權。
丟擲:一個派生自interprocess_exception的異常。
降級
voidunlock_and_lock_upgradable()
前提條件:執行緒必須有互斥量的獨佔所有權。
效果:執行緒原子釋放獨佔所有權並且獲得升級所有權。此操作不阻塞。
丟擲:一個派生自interprocess_exception的異常。
void unlock_and_lock_sharable()
前提條件:執行緒必須有互斥量的獨佔所有權。
效果:執行緒原子釋放獨佔所有權並且獲得共享所有權。此操作不阻塞。
丟擲:一個派生自interprocess_exception的異常。
voidunlock_upgradable_and_lock_sharable()
前提條件:執行緒必須有互斥量的升級所有權。
效果:執行緒原子釋放升級所有權並且獲得共享所有權。此操作不阻塞。
丟擲:一個派生自interprocess_exception的異常。
升級
voidunlock_upgradable_and_lock()
前提條件:執行緒必須有互斥量的升級所有權。
效果:執行緒原子釋放升級所有權並且獲得獨佔所有權。此操作將阻塞直到所有具有共享所有權的執行緒釋放它為止。
丟擲:一個派生自interprocess_exception的異常。
booltry_unlock_upgradable_and_lock()
前提條件:執行緒必須有互斥量的升級所有權。
效果:執行緒原子釋放升級所有權並且嘗試獲得獨佔所有權。此操作將失敗,如果有執行緒具有共享所有權。但是它會保持升級所有權。
返回:如果獲得獨佔所有權,返回true。否則返回false。
丟擲:一個派生自interprocess_exception的異常。
booltimed_unlock_upgradable_and_lock(const boost::posix_time::ptime &abs_time)
前提條件:執行緒必須有互斥量的升級所有權。
效果:執行緒原子釋放升級所有權並且嘗試獲得獨佔所有權,如果需要,等待直到abs_time。此操作將失敗,如果有執行緒具有共享所有權或超時。但是它會保持升級所有權。
返回:如果獲得獨佔所有權,返回true。否則返回false。
丟擲:一個派生自interprocess_exception的異常。
booltry_unlock_sharable_and_lock()
前提條件:執行緒必須有互斥量的共享所有權。
效果:執行緒原子釋放共享所有權並且嘗試獲得獨佔所有權。此操作將失敗,如果有執行緒具有共享或升級所有權。但是它會保持共享所有權。
返回:如果獲得獨佔所有權,返回true。否則返回false。
丟擲:一個派生自interprocess_exception的異常。
booltry_unlock_sharable_and_lock_upgradable()
前提條件:執行緒必須有互斥量的共享所有權。
效果:執行緒原子釋放共享所有權並且嘗試獲得升級所有權。此操作將失敗,如果有執行緒具有共享或升級所有權。但是它會保持共享所有權。
返回:如果獲得升級所有權,返回true。否則返回false。
丟擲:一個派生自interprocess_exception的異常。
Boost.Interprocess升級互斥量型別和標頭檔案
Boost.Interprocess提供如下的升級互斥量型別:
#include <boost/interprocess/sync/interprocess_upgradable_mutex.hpp>
- interprocess_upgradable_mutex:一種非遞迴的、匿名的升級互斥量,它能被置於共享記憶體或記憶體對映檔案中。
#include <boost/interprocess/sync/named_upgradable_mutex.hpp>
- named_upgradable_mutex:一種非遞迴的,具名的升級互斥量。
共享鎖和升級鎖
共享鎖和升級鎖標頭檔案
和普通互斥量一樣,甚至在發生異常時,也能及時釋放鎖是很重要的。 Boost.Interprocess互斥量最好使用scoped_lock工具,並且這個類僅提供獨佔鎖定。
因為我們有采用升級互斥量的共享鎖定和升級鎖定,因此我們有兩種新工具:sharable_lock 和 upgradable_lock。這兩個類都與scoped_lock類似,但是 sharable_lock在構造時需要共享鎖,upgradable_lock在構造時需要升級鎖。
這兩個工具能與任何能夠提供需要操作的同步物件一起使用。例如,一個使用者自定義的無升級鎖特徵的互斥量型別能夠使用sharable_lock ,如果同步物件提供lock_sharable() 和unlock_sharable()操作。
共享鎖和升級鎖標頭檔案
#include <boost/interprocess/sync/sharable_lock.hpp>
#include <boost/interprocess/sync/upgradable_lock.hpp>
共享鎖在解構函式中呼叫unlock_sharable(),升級鎖在解構函式中呼叫 unlock_upgradable(),因此當有異常發生時,升級互斥量也總能解鎖。區域性鎖有許多建構函式來lock, try_lock, timed_lock 一個互斥量或解鎖。
using namespace boost::interprocess;
//Let's create any mutex type:
MutexType mutex;
{
//This will call lock_sharable()
sharable_lock<MutexType> lock(mutex);
//Some code
//The mutex will be unlocked here
}
{
//This won't lock the mutex()
sharable_lock<MutexType> lock(mutex, defer_lock);
//Lock it on demand. This will call lock_sharable()
lock.lock();
//Some code
//The mutex will be unlocked here
}
{
//This will call try_lock_sharable()
sharable_lock<MutexType> lock(mutex, try_to_lock);
//Check if the mutex has been successfully locked
if(lock){
//Some code
}
//If the mutex was locked it will be unlocked
}
{
boost::posix_time::ptime abs_time = ...
//This will call timed_lock_sharable()
scoped_lock<MutexType> lock(mutex, abs_time);
//Check if the mutex has been successfully locked
if(lock){
//Some code
}
//If the mutex was locked it will be unlocked
}
{
//This will call lock_upgradable()
upgradable_lock<MutexType> lock(mutex);
//Some code
//The mutex will be unlocked here
}
{
//This won't lock the mutex()
upgradable_lock<MutexType> lock(mutex, defer_lock);
//Lock it on demand. This will call lock_upgradable()
lock.lock();
//Some code
//The mutex will be unlocked here
}
{
//This will call try_lock_upgradable()
upgradable_lock<MutexType> lock(mutex, try_to_lock);
//Check if the mutex has been successfully locked
if(lock){
//Some code
}
//If the mutex was locked it will be unlocked
}
{
boost::posix_time::ptime abs_time = ...
//This will call timed_lock_upgradable()
scoped_lock<MutexType> lock(mutex, abs_time);
//Check if the mutex has been successfully locked
if(lock){
//Some code
}
//If the mutex was locked it will be unlocked
}
upgradable_lock
和sharable_lock
提供更多的特性和操作,更多詳情可查閱它們的參考手冊。
通過移動語義轉移鎖
簡單鎖轉移 鎖轉移概要 轉移未鎖定的鎖 轉移失敗 |
程序間通訊為不支援右值引用的編譯器使用它自己的轉移語義模擬程式碼。這只是一個臨時解決方案直到Boost轉移語義庫被接受。
區域性所以及類似的工具提供了簡單資源管理的可能性,但是採用例如升級互斥量這樣的高階互斥量型別,就有如下操作:一個獲取的鎖型別被釋放,另一個鎖型別被原子獲取。這由升級鎖操作,例如unlock_and_lock_sharable()完成。
使用鎖轉移操作,這些操作能夠有效的進行。一個鎖轉移操作顯式指出一個鎖擁有的互斥量通過執行原子解鎖/加鎖操作被轉移到另一個鎖。
簡單鎖轉移
考慮一個執行緒在開始的時候會修改一些資料,但之後,它在很長的時間裡僅讀取資料。程式碼可以獲取獨佔鎖,修改資料然後原子釋放獨佔鎖並且獲取共享鎖。通過這一系列操作,我們可以保證沒有其他執行緒在轉移中能修改資料,並且更多的讀執行緒能獲取共享鎖,從而提高併發性。如果沒有鎖轉移操作,程式碼將編寫為如下:
using boost::interprocess;
interprocess_upgradable_mutex mutex;
//Acquire exclusive lock
mutex.lock();
//Modify data
//Atomically release exclusive lock and acquire sharable lock.
//More threads can acquire the sharable lock and read the data.
mutex.unlock_and_lock_sharable();
//Read data
//Explicit unlocking
mutex.unlock_sharable();
這很簡單,但是如果出現異常,就很難知道當異常丟擲的時候互斥量擁有什麼型別的鎖,以及我們應該呼叫什麼函式去解鎖它。
try{
//Mutex operations
}
catch(...){
//What should we call? "unlock()" or "unlock_sharable()"
//Is the mutex locked?
}
我們可以使用鎖轉移來簡化所有的這些管理:
using boost::interprocess;
interprocess_upgradable_mutex mutex;
//Acquire exclusive lock
scoped_lock s_lock(mutex);
//Modify data
//Atomically release exclusive lock and acquire sharable lock.
//More threads can acquire the sharable lock and read the data.
sharable_lock(move(s_lock));
//Read data
//The lock is automatically unlocked calling the appropriate unlock
//function even in the presence of exceptions.
//If the mutex was not locked, no function is called.
如我們所看到的,無論在任何時候丟擲異常,互斥量都能呼叫適當的unlock()或unlock_sharable()方法自動解鎖。
鎖轉移概要
轉移到區域性鎖 轉移到升級鎖 轉移到共享鎖 |
有許多鎖轉移操作,我們可以根據出現在升級互斥量操作中的操作來分類:
- 保證成功和非阻塞:任何從更嚴格的鎖至次嚴格的鎖。區域性的->升級的、區域性的->共享的、升級的->共享的。
- 不保證成功:如果沒有其他執行緒已經獲取此升級或獨佔鎖,則從共享的->獨佔的操作可能成功。此操作是一個嘗試操作。
- 如果無限等待,保證能成功:從升級的->區域性的轉換將成功,但需要等待所有共享鎖被釋放。由於這是一個阻塞操作,我們也可以選擇不無限等待而僅僅嘗試或僅等待超時即可。
轉移到區域性鎖
轉移到區域性鎖僅在從upgradable_lock且請求阻塞操作情況下保證成功,這源於此操作需要等待直到所有的共享鎖被釋放的事實。使用者可以使用”try”或”timed”轉移來避免無限鎖定,但成功性得不到保證。
從sharable_lock 的轉換從來得不到保證,因此僅允許嘗試操作:
//Conversions to scoped_lock
{
upgradable_lock<Mutex> u_lock(mut);
//This calls unlock_upgradable_and_lock()
scoped_lock<Mutex> e_lock(move(u_lock));
}
{
upgradable_lock<Mutex> u_lock(mut);
//This calls try_unlock_upgradable_and_lock()
scoped_lock<Mutex> e_lock(move(u_lock, try_to_lock));
}
{
boost::posix_time::ptime t = test::delay(100);
upgradable_lock<Mutex> u_lock(mut);
//This calls timed_unlock_upgradable_and_lock()
scoped_lock<Mutex> e_lock(move(u_lock));
}
{
sharable_lock<Mutex> s_lock(mut);
//This calls try_unlock_sharable_and_lock()
scoped_lock<Mutex> e_lock(move(s_lock, try_to_lock));
}
轉移到升級鎖
僅當從 scoped_lock轉移為upgradable_lock 是保證成功的,因為區域性鎖是比升級鎖更嚴格的鎖定。此操作非阻塞。
從sharable_lock 的轉換不保證成功,因此僅允許嘗試操作:
//Conversions to upgradable
{
sharable_lock<Mutex> s_lock(mut);
//This calls try_unlock_sharable_and_lock_upgradable()
upgradable_lock<Mutex> u_lock(move(s_lock, try_to_lock));
}
{
scoped_lock<Mutex> e_lock(mut);
//This calls unlock_and_lock_upgradable()
upgradable_lock<Mutex> u_lock(move(e_lock));
}
轉移到共享鎖
所有轉移至sharable_lock都保證是成功的,因為升級鎖和區域性所都比共享鎖嚴格。這些操作非阻塞:
//Conversions to sharable_lock
{
upgradable_lock<Mutex> u_lock(mut);
//This calls unlock_upgradable_and_lock_sharable()
sharable_lock<Mutex> s_lock(move(u_lock));
}
{
scoped_lock<Mutex> e_lock(mut);
//This calls unlock_and_lock_sharable()
sharable_lock<Mutex> s_lock(move(e_lock));
}
轉移未鎖定的鎖
在之前的例子中,使用在轉移操作中的互斥量預先被鎖定了:
Mutex mut;
//This calls mut.lock()
scoped_lock<Mutex> e_lock(mut);
//This calls unlock_and_lock_sharable()
sharable_lock<Mutex> s_lock(move(e_lock));
}
但是使用一個未鎖定的源來執行轉移是可能的,基於顯式解鎖、try/timed或defer_lock建構函式:
//These operations can leave the mutex unlocked!
{
//Try might fail
scoped_lock<Mutex> e_lock(mut, try_to_lock);
sharable_lock<Mutex> s_lock(move(e_lock));
}
{
//Timed operation might fail
scoped_lock<Mutex> e_lock(mut, time);
sharable_lock<Mutex> s_lock(move(e_lock));
}
{
//Avoid mutex locking
scoped_lock<Mutex> e_lock(mut, defer_lock);
sharable_lock<Mutex> s_lock(move(e_lock));
}
{
//Explicitly call unlock
scoped_lock<Mutex> e_lock(mut);
e_lock.unlock();
//Mutex was explicitly unlocked
sharable_lock<Mutex> s_lock(move(e_lock));
}
如果源互斥量未被鎖定:
- 目標鎖不執行原子unlock_xxx_和_lock_xxx操作。
- 目標鎖未被鎖定。
- 源鎖被釋放並且互斥量的所有權被轉移至目標。
{
scoped_lock<Mutex> e_lock(mut, defer_lock);
sharable_lock<Mutex> s_lock(move(e_lock));
//Assertions
assert(e_lock.mutex() == 0);
assert(s_lock.mutex() != 0);
assert(e_lock.owns() == false);
}
轉移失敗
當執行鎖轉移時,操作可能失敗:
- 被執行的原子互斥量解鎖加鎖函式可能會丟擲異常。
- 被執行的原子函式可能是”try”或”timed”函式,它們可能會失敗。
在第一種情況中,互斥量的所有權沒有轉移並且源鎖的解構函式會解鎖互斥量:
{
scoped_lock<Mutex> e_lock(mut, defer_lock);
//This operations throws because
//"unlock_and_lock_sharable()" throws!!!
sharable_lock<Mutex> s_lock(move(e_lock));
//Some code ...
//e_lock's destructor will call "unlock()"
}
在第二種情況中,如果一個內部的 "try"或"timed"操作失敗(返回”false”),則互斥量的所有權沒有轉移,源鎖未改變並且目標鎖的狀態與預設的構造相同:
{
sharable_lock<Mutex> s_lock(mut);
//Internal "try_unlock_sharable_and_lock_upgradable()" returns false
upgradable_lock<Mutex> u_lock(move(s_lock, try_to_lock));
assert(s_lock.mutex() == &mut);
assert(s_lock.owns() == true);
assert(u_lock.mutex() == 0);
assert(u_lock.owns() == false);
//u_lock's destructor does nothing
//s_lock's destructor calls "unlock()"
}
檔案鎖
什麼是檔案鎖 檔案鎖操作 檔案鎖定中的區域性鎖和共享鎖 小心:同步限制 小心iostream寫入 |
什麼是檔案鎖?
檔案鎖是一種程序間通訊機制,它使用一個嵌入至檔案中的互斥量來保護檔案讀寫的併發操作。這種嵌入式互斥量具有共享鎖和獨佔鎖的能力。採用檔案鎖,現有的檔案可以做為一個互斥量,而不需要建立額外的同步物件來控制檔案讀寫的併發性。
一般來說,我們有兩種檔案鎖定能力:
- 諮詢鎖定:作業系統核心維護一個已被鎖定檔案的列表。但是甚至一個程序已經獲得了一個共享鎖,也不阻止向這些檔案寫資料或當一個程序已經獲取了獨佔鎖,也不阻止從檔案中讀資料。任何程序可以忽略一個諮詢鎖。這意味著諮詢鎖是用於合作程序的,程序間彼此信任。這與保護共享記憶體片內資料的互斥量類似:任何連線至那塊記憶體的程序可以覆蓋資料,但是合作的程序使用互斥量來保護資料首先獲取互斥鎖。
- 強制鎖定:作業系統核心檢查每次讀寫請求,從而校驗操作能按照獲取的鎖來執行。讀/寫均被阻塞直到鎖被釋放。
Boost.Interprocess 基於可移植性的原因使用諮詢鎖定。這意味著所有併發訪問檔案的程序需要合作使用檔案鎖來同步訪問。
在一些系統中,檔案鎖定甚至能被進一步完善成記錄鎖定,在此情況下使用者能在檔案