圓形緩衝區(迴圈buffer)實現【轉】
(轉自:https://blog.csdn.net/hbuxiaofei/article/details/51463662#commentBox)
用法
圓形緩衝區的一個有用特性是:當一個數據元素被用掉後,其餘資料元素不需要移動其儲存位置。相反,一個非圓形緩衝區(例如一個普通的佇列)在用掉一個數據元素後,其餘資料元素需要向前搬移。換句話說,圓形緩衝區適合實現先進先出緩衝區,而非圓形緩衝區適合後進先出緩衝區。
圓形緩衝區適合於事先明確了緩衝區的最大容量的情形。擴充套件一個圓形緩衝區的容量,需要搬移其中的資料。因此一個緩衝區如果需要經常調整其容量,用連結串列實現更為合適。
寫操作覆蓋圓形緩衝區中未被處理的資料在某些情況下是允許的。特別是在多媒體處理時。例如,音訊的生產者可以覆蓋掉
工作過程
一個圓形緩衝區最初為空並有預定的長度。例如,這是一個具有七個元素空間的圓形緩衝區,其中底部的單線與箭頭表示“頭尾相接”形成一個圓形地址空間:
假定1被寫入緩衝區中部(對於圓形緩衝區來說,最初的寫入位置在哪裡是無關緊要的):
再寫入2個元素,分別是2 & 3 — 被追加在1之後:
如果兩個元素被處理,那麼是緩衝區中最老的兩個元素被移除。在本例中,1 & 2被移除,緩衝區中只剩下3:
如果緩衝區中有7個元素,則是滿的:
如果緩衝區是滿的,又要寫入新的資料,一種策略是覆蓋掉最老的資料。此例中,2個新資料— A & B — 寫入,覆蓋了3 & 4:
也可以採取其他策略,禁止覆蓋緩衝區的資料,採取返回一個錯誤碼或者丟擲異常。
最終,如果從緩衝區中移除2個數據,不是3 & 4 而是 5 & 6 。因為 A & B 已經覆蓋了3 & 4:
圓形緩衝區工作機制
由於計算機記憶體是線性地址空間,因此圓形緩衝區需要特別的設計才可以從邏輯上實現。讀指標與寫指標
一般的,圓形緩衝區需要4個指標:
- 在記憶體中實際開始位置;
- 在記憶體中實際結束位置,也可以用緩衝區長度代替;
- 儲存在緩衝區中的有效資料的開始位置(讀指標);
- 儲存在緩衝區中的有效資料的結尾位置(寫指標)。
讀指標、寫指標可以用整型值來表示。
下例為一個未滿的緩衝區的讀寫指標:
下例為一個滿的緩衝區的讀寫指標:
區分緩衝區滿或者空
緩衝區是滿、或是空,都有可能出現讀指標與寫指標指向同一位置。有多種策略用於檢測緩衝區是滿、或是空:
總是保持一個儲存單元為空
緩衝區中總是有一個儲存單元保持未使用狀態。緩衝區最多存入個數據。如果讀寫指標指向同一位置,則緩衝區為空。如果寫指標位於讀指標的相鄰後一個位置,則緩衝區為滿。這種策略的優點是簡單、魯棒;缺點是語義上實際可存資料量與緩衝區容量不一致,測試緩衝區是否滿需要做取餘數計算。
使用資料計數
這種策略不使用顯式的寫指標,而是保持著緩衝區記憶體儲的資料的計數。因此測試緩衝區是空是滿非常簡單;對效能影響可以忽略。缺點是讀寫操作都需要修改這個儲存資料計數,對於多執行緒訪問緩衝區需要併發控制。
映象指示位
緩衝區的長度如果是n,邏輯地址空間則為0至n-1;那麼,規定n至2n-1為映象邏輯地址空間。本策略規定讀寫指標的地址空間為0至2n-1,其中低半部分對應於常規的邏輯地址空間,高半部分對應於映象邏輯地址空間。當指標值大於等於2n時,使其折返(wrapped)到ptr-2n。使用一位表示寫指標或讀指標是否進入了虛擬的映象儲存區:置位表示進入,不置位表示沒進入還在基本儲存區。
在讀寫指標的值相同情況下,如果二者的指示位相同,說明緩衝區為空;如果二者的指示位不同,說明緩衝區為滿。這種方法優點是測試緩衝區滿/空很簡單;不需要做取餘數操作;讀寫執行緒可以分別設計專用演算法策略,能實現精緻的併發控制。 缺點是讀寫指標各需要額外的一位作為指示位。
如果緩衝區長度是2的冪,則本方法可以省略映象指示位。如果讀寫指標的值相等,則緩衝區為空;如果讀寫指標相差n,則緩衝區為滿,這可以用條件表示式(寫指標 == (讀指標 異或 緩衝區長度))來判斷。
-
/* This approach adds one bit to end and start pointers */
-
-
/* Circular buffer object */
-
typedef
struct {
-
int size;
/* maximum number of elements */
-
int start;
/* index of oldest element */
-
int end;
/* index at which to write new element */
-
ElemType *elems;
/* vector of elements */
-
} CircularBuffer;
-
-
void cbInit(CircularBuffer *cb, int size) {
-
cb->size = size;
-
cb->start =
0;
-
cb->end =
0;
-
cb->elems = (ElemType *)
calloc(cb->size,
sizeof(ElemType));
-
}
-
-
void cbPrint(CircularBuffer *cb) {
-
printf(
"size=0x%x, start=%d, end=%d\n", cb->size, cb->start, cb->end);
-
}
-
-
int cbIsFull(CircularBuffer *cb) {
-
return cb->end == (cb->start ^ cb->size);
/* This inverts the most significant bit of start before comparison */ }
-
-
int cbIsEmpty(CircularBuffer *cb) {
-
return cb->end == cb->start; }
-
-
int cbIncr(CircularBuffer *cb, int p) {
-
return (p +
1)&(
2*cb->size
-1);
/* start and end pointers incrementation is done modulo 2*size */
-
}
-
-
void cbWrite(CircularBuffer *cb, ElemType *elem) {
-
cb->elems[cb->end&(cb->size
-1)] = *elem;
-
if (cbIsFull(cb))
/* full, overwrite moves start pointer */
-
cb->start = cbIncr(cb, cb->start);
-
cb->end = cbIncr(cb, cb->end);
-
}
-
-
void cbRead(CircularBuffer *cb, ElemType *elem) {
-
*elem = cb->elems[cb->start&(cb->size
-1)];
-
cb->start = cbIncr(cb, cb->start);
-
}
讀/寫 計數
用兩個有符號整型變數分別儲存寫入、讀出緩衝區的資料數量。其差值就是緩衝區中尚未被處理的有效資料的數量。這種方法的優點是讀執行緒、寫執行緒互不干擾;缺點是需要額外兩個變數。
記錄最後的操作
使用一位記錄最後一次操作是讀還是寫。讀寫指標值相等情況下,如果最後一次操作為寫入,那麼緩衝區是滿的;如果最後一次操作為讀出,那麼緩衝區是空。 這種策略的缺點是讀寫操作共享一個標誌位,多執行緒時需要併發控制。
POSIX優化實現
-
#include <sys/mman.h>
-
#include <stdlib.h>
-
#include <unistd.h>
-
-
#define report_exceptional_condition() abort ()
-
-
struct ring_buffer
-
{
-
void *address;
-
-
unsigned
long count_bytes;
-
unsigned
long write_offset_bytes;
-
unsigned
long read_offset_bytes;
-
};
-
-
//Warning order should be at least 12 for Linux
-
void
-
ring_buffer_create
(struct ring_buffer *buffer, unsigned long order)
-
{
-
char path[] =
"/dev/shm/ring-buffer-XXXXXX";
-
int file_descriptor;
-
void *address;
-
int status;
-
-
file_descriptor = mkstemp (path);
-
if (file_descriptor <
0)
-
report_exceptional_condition ();
-
-
status = unlink (path);
-
if (status)
-
report_exceptional_condition ();
-
-
buffer->count_bytes =
1UL << order;
-
buffer->write_offset_bytes =
0;
-
buffer->read_offset_bytes =
0;
-
-
status = ftruncate (file_descriptor, buffer->count_bytes);
-
if (status)
-
report_exceptional_condition ();
-
-
buffer->address = mmap (
NULL, buffer->count_bytes <<
1, PROT_NONE,
-
MAP_ANONYMOUS | MAP_PRIVATE,
-1,
0);
-
-
if (buffer->address == MAP_FAILED)
-
report_exceptional_condition ();
-
-
address =
-
mmap (buffer->address, buffer->count_bytes, PROT_READ | PROT_WRITE,
-
MAP_FIXED | MAP_SHARED, file_descriptor,
0);
-
-
if (address != buffer->address)
-
report_exceptional_condition ();
-
-
address = mmap (buffer->address + buffer->count_bytes,
-
buffer->count_bytes, PROT_READ | PROT_WRITE,
-
MAP_FIXED | MAP_SHARED, file_descriptor,
0);
-
-
if (address != buffer->address + buffer->count_bytes)
-
report_exceptional_condition ();
-
-
status = close (file_descriptor);
-
if (status)
-
report_exceptional_condition ();
-
}
-
-
void
-
ring_buffer_free
(struct ring_buffer *buffer)
-
{
-
int status;
-
-
status = munmap (buffer->address, buffer->count_bytes <<
1);
-
if (status)
-
report_exceptional_condition ();
-
}
-
-
void *
-
ring_buffer_write_address
(struct ring_buffer *buffer)
-
{
-
/*** void pointer arithmetic is a constraint violation. ***/
-
return buffer->address + buffer->write_offset_bytes;
-
}
-
-
void
-
ring_buffer_write_advance
(struct ring_buffer *buffer,
-
unsigned
long count_bytes)
-
{
-
buffer->write_offset_bytes += count_bytes;
-
}
-
-
void *
-
ring_buffer_read_address
(struct ring_buffer *buffer)
-
{
-
return buffer->address + buffer->read_offset_bytes;
-
}
-
-
void
-
ring_buffer_read_advance
(struct ring_buffer *buffer,
-
unsigned
long count_bytes)
-
{
-
buffer->read_offset_bytes += count_bytes;
-
-
if (buffer->read_offset_bytes >= buffer->count_bytes)
-
{
/*如果讀指標大於等於緩衝區長度,那些讀寫指標同時折返回[0, buffer_size]範圍內 */
-
buffer->read_offset_bytes -= buffer->count_bytes;
-
buffer->write_offset_bytes -= buffer->count_bytes;
-
}
-
}
-
-
unsigned long
-
ring_buffer_count_bytes
(struct ring_buffer *buffer)
-
{
-
return buffer->write_offset_bytes - buffer->read_offset_bytes;
-
}
-
-
unsigned long
-
ring_buffer_count_free_bytes
(struct ring_buffer *buffer)
-
{
-
return buffer->count_bytes - ring_buffer_count_bytes (buffer);
-
}
-
-
void
-
ring_buffer_clear
(struct ring_buffer *buffer)
-
{
-
buffer->write_offset_bytes =
0;
-
buffer->read_offset_bytes =
0;
-
}
-
-
/*Note, that initial anonymous mmap() can be avoided - after initial mmap() for descriptor fd,
-
you can try mmap() with hinted address as (buffer->address + buffer->count_bytes) and if it fails -
-
another one with hinted address as (buffer->address - buffer->count_bytes).
-
Make sure MAP_FIXED is not used in such case, as under certain situations it could end with segfault.
-
The advantage of such approach is, that it avoids requirement to map twice the amount you need initially
-
(especially useful e.g. if you want to use hugetlbfs and the allowed amount is limited)
-
and in context of gcc/glibc - you can avoid certain feature macros
-
(MAP_ANONYMOUS usually requires one of: _BSD_SOURCE, _SVID_SOURCE or _GNU_SOURCE).*/
Linux核心的kfifo
在Linux核心檔案kfifo.h和kfifo.c中,定義了一個先進先出圓形緩衝區實現。如果只有一個讀執行緒、一個寫執行緒,二者沒有共享的被修改的控制變數,那麼可以證明這種情況下不需要併發控制。kfifo就滿足上述條件。kfifo要求緩衝區長度必須為2的冪。讀、寫指標分別是無符號整型變數。把讀寫指標變換為緩衝區內的索引值,僅需要“按位與”操作:(指標值 按位與 (緩衝區長度-1))。這避免了計算代價高昂的“求餘”操作。且下述關係總是成立:
- 讀指標 + 緩衝區儲存的資料長度 == 寫指標
即使在寫指標達到了無符號整型的上界,上溢位後寫指標的值小於讀指標的值,上述關係仍然保持成立(這是因為無符號整型加法的性質)。 kfifo的寫操作,首先計算緩衝區中當前可寫入儲存空間的資料長度:
- len = min{待寫入資料長度, 緩衝區長度 - (寫指標 - 讀指標)}
然後,分兩段寫入資料。第一段是從寫指標開始向緩衝區末尾方向;第二段是從緩衝區起始處寫入餘下的可寫入資料,這部分可能資料長度為0即並無實際資料寫入。