剖析nsq訊息佇列(三) 訊息傳輸的可靠性和持久化[二]diskqueue
上一篇主要說了一下nsq是如何保證訊息被消費端成功消費,大概提了一下訊息的持久化,--mem-queue-size
設定為 0,所有的訊息將會儲存到磁碟。
總有人說nsq
的持久化問題,消除疑慮的方法就是閱讀原碼做benchmark測試,個人感覺nsq
還是很靠譜的。
nsq
自己實現了一個先進先出的訊息檔案佇列go-diskqueue是把訊息儲存到本地檔案內,很值得分析一下他的實現過程。
整體處理邏輯
go-diskqueue
會啟動一個gorouting
進行讀寫資料也就是方法ioLoop
會根據你設定的引數來進行資料的讀寫,流程圖如下
這個圖畫的也不是特別的準確
ioLoop
用的是select
並不是if else
當有多個條件為true
時,會隨機選一個進行執行
nsq
生成的資料大致如下:
xxxx.diskqueue.meta.dat
元資料儲存了未讀訊息的長度,讀取和存入資料的編號和讀取位置
xxxx.diskqueue.編號.dat
訊息儲存的檔案,每一個訊息的儲存:4Byte訊息的長度+訊息
引數說明
一些主要的引數和約束說明
這些引數的使用在後面的處理邏輯中會提到
// diskQueue implements a filesystem backed FIFO queue type diskQueue struct { // run-time state (also persisted to disk) // 讀取資料的位置 readPos int64 // 寫入資料的位置 writePos int64 // 讀取檔案的編號 readFileNum int64 // 寫入檔案的編號 writeFileNum int64 // 未處理的訊息總數 depth int64 // instantiation time metadata // 每個檔案的大小限制 maxBytesPerFile int64 // currently this cannot change once created // 每條訊息的最小大小限制 minMsgSize int32 // 每條訊息的最大大小限制 maxMsgSize int32 // 快取訊息有多少條後進行寫入 syncEvery int64 // number of writes per fsync // 自動寫入訊息檔案的時間間隔 syncTimeout time.Duration // duration of time per fsync exitFlag int32 needSync bool // keeps track of the position where we have read // (but not yet sent over readChan) // 下一條訊息的位置 nextReadPos int64 // 下一條訊息的檔案編號 nextReadFileNum int64 // 讀取的檔案 readFile *os.File // 寫入的檔案 writeFile *os.File // 讀取的buffer reader *bufio.Reader // 寫入的buffer writeBuf bytes.Buffer // exposed via ReadChan() // 讀取資料的channel readChan chan []byte //..... }
資料
元資料
讀寫資料資訊的元資料儲存在xxxxx.diskqueue.meta.data檔案內主要用到程式碼裡的欄位如下
未處理的訊息總數 depth
讀取檔案的編號 readFileNum
讀取資料的位置 readPos
寫入檔案的編號 writeFileNum
寫入資料的位置 writePos
真實資料如下
15
0,22
3,24
儲存元資料資訊
func (d *diskQueue) persistMetaData() error { // ... fileName := d.metaDataFileName() tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int()) // write to tmp file f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600) // 元資料資訊 _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n", atomic.LoadInt64(&d.depth), d.readFileNum, d.readPos, d.writeFileNum, d.writePos) // 儲存 f.Sync() f.Close() // atomically rename return os.Rename(tmpFileName, fileName) }
得到元資料資訊
func (d *diskQueue) retrieveMetaData() error {
// ...
fileName := d.metaDataFileName()
f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)
// 讀取資料並賦值
var depth int64
_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
&depth,
&d.readFileNum, &d.readPos,
&d.writeFileNum, &d.writePos)
//...
atomic.StoreInt64(&d.depth, depth)
d.nextReadFileNum = d.readFileNum
d.nextReadPos = d.readPos
return nil
}
訊息資料
寫入一條資料
ioLoop
中發現有資料寫入時,會呼叫writeOne
方法,把訊息儲存到檔案內
select {
// ...
case dataWrite := <-d.writeChan:
count++
d.writeResponseChan <- d.writeOne(dataWrite)
// ...
func (d *diskQueue) writeOne(data []byte) error {
var err error
if d.writeFile == nil {
curFileName := d.fileName(d.writeFileNum)
d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
// ...
if d.writePos > 0 {
_, err = d.writeFile.Seek(d.writePos, 0)
// ...
}
}
dataLen := int32(len(data))
// 判斷訊息的長度是否合法
if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize)
}
d.writeBuf.Reset()
// 寫入4位元組的訊息長度,以大端序儲存
err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
if err != nil {
return err
}
// 寫入訊息
_, err = d.writeBuf.Write(data)
if err != nil {
return err
}
// 寫入到檔案
_, err = d.writeFile.Write(d.writeBuf.Bytes())
// ...
// 計算寫入位置,訊息數量加1
totalBytes := int64(4 + dataLen)
d.writePos += totalBytes
atomic.AddInt64(&d.depth, 1)
// 如果寫入位置大於 單個檔案的最大限制, 則持久化檔案到硬碟
if d.writePos > d.maxBytesPerFile {
d.writeFileNum++
d.writePos = 0
// sync every time we start writing to a new file
err = d.sync()
// ...
}
return err
}
寫入完訊息後,會判斷當前的檔案大小是否已經已於maxBytesPerFile
如果大,就持久化檔案到硬碟,然後重新開啟一個新編號檔案,進行寫入。
什麼時候持久化檔案到硬碟
呼叫sync()
方法會持久化檔案到硬碟,然後重新開啟一個新編號檔案,進行寫入。
有幾個地方呼叫會呼叫這個方法:
- 一個寫入檔案的條數達到了
syncEvery
的值時,也就是初始化時設定的最大的條數。會呼叫sync()
syncTimeout
初始化時設定的同步時間間隔,如果這個時間間隔到了,並且寫入的檔案條數>0的時候,會呼叫sync()
- 還有就是上面說過的
writeOne
方法,寫入完訊息後,會判斷當前的檔案大小是否已經已於maxBytesPerFile
如果大,會呼叫sync()
- 當讀取檔案時,把整個檔案讀取完時,會刪除這個檔案並且會把
needSync
設定為true
,ioLoop
會呼叫sync()
- 還有就是
Close
的時候,會呼叫sync()
func (d *diskQueue) sync() error {
if d.writeFile != nil {
// 把資料 flash到硬碟,關閉檔案並設定為 nil
err := d.writeFile.Sync()
if err != nil {
d.writeFile.Close()
d.writeFile = nil
return err
}
}
// 儲存元資料資訊
err := d.persistMetaData()
// ...
d.needSync = false
return nil
}
讀取一條資料
元資料儲存著 讀取檔案的編號 readFileNum
和讀取資料的位置 readPos
並且diskQueue
暴露出了一個方法來,通過channel
來讀取資料
func (d *diskQueue) ReadChan() chan []byte {
return d.readChan
}
ioLoop
裡,當發現讀取位置小於寫入位置 或者讀檔案編號小於寫檔案編號,並且下一個讀取位置等於當前位置時才會讀取一條資料,然後放在一個外部全域性變數 dataRead
裡,並把 讀取的channel
賦值監聽 r = d.readChan
,當外部有人讀取了訊息,則進行moveForward
操作
func (d *diskQueue) ioLoop() {
var dataRead []byte
var err error
var count int64
var r chan []byte
for {
// ...
if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
if d.nextReadPos == d.readPos {
dataRead, err = d.readOne()
if err != nil {
d.handleReadError()
continue
}
}
r = d.readChan
} else {
r = nil
}
select {
// ...
case r <- dataRead:
count++
// moveForward sets needSync flag if a file is removed
d.moveForward()
// ...
}
}
// ...
}
readOne
從檔案裡讀取一條訊息,4個bit的大小,然後讀取具體的訊息。如果讀取位置大於最大檔案限制,則close。在moveForward裡會進行刪除操作
func (d *diskQueue) readOne() ([]byte, error) {
var err error
var msgSize int32
// 如果readFile是nil,開啟一個新的
if d.readFile == nil {
curFileName := d.fileName(d.readFileNum)
d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)
// ...
d.reader = bufio.NewReader(d.readFile)
}
err = binary.Read(d.reader, binary.BigEndian, &msgSize)
// ...
readBuf := make([]byte, msgSize)
_, err = io.ReadFull(d.reader, readBuf)
totalBytes := int64(4 + msgSize)
// ...
d.nextReadPos = d.readPos + totalBytes
d.nextReadFileNum = d.readFileNum
// 如果讀取位置大於最大檔案限制,則close。在moveForward裡會進行刪除操作
if d.nextReadPos > d.maxBytesPerFile {
if d.readFile != nil {
d.readFile.Close()
d.readFile = nil
}
d.nextReadFileNum++
d.nextReadPos = 0
}
return readBuf, nil
}
moveForward
方法會檢視讀取的編號,如果發現下一個編號 和當前的編號不同時,則刪除舊的檔案。
func (d *diskQueue) moveForward() {
oldReadFileNum := d.readFileNum
d.readFileNum = d.nextReadFileNum
d.readPos = d.nextReadPos
depth := atomic.AddInt64(&d.depth, -1)
// see if we need to clean up the old file
if oldReadFileNum != d.nextReadFileNum {
// sync every time we start reading from a new file
d.needSync = true
fn := d.fileName(oldReadFileNum)
err := os.Remove(fn)
// ...
}
d.checkTailCorruption(depth)