nsq源碼閱讀筆記之nsqd(三)——diskQueue
diskQueue
是backendQueue
接口的一個實現。backendQueue
的作用是在實現在內存go channel緩沖區滿的情況下對消息的處理的對象。
除了diskQueue
外還有dummyBackendQueue
實現了backendQueue
接口。
對於臨時(#ephemeral結尾)Topic/Channel,在創建時會使用dummyBackendQueue
初始化backend
, dummyBackendQueue
只是為了統一臨時和非臨時Topic/Channel而寫的,它只是實現了接口,不做任何實質上的操作,
因此在內存緩沖區滿時直接丟棄消息。這也是臨時Topic/Channel和非臨時的一個比較大的差別。
每個非臨時Topic/Channel,創建的時候使用diskQueue
backend
,diskQueue
的功能是將消息寫入磁盤進行持久化, 並在需要時從中取出消息重新向客戶端投遞。
diskQueue
的實現在nsqd/disk_queue.go
中。需要註意一點,查找diskQueue
中的函數的調用可能不會返回正確的結果,
因為diskQueue
對外是以backendQueue
形式存在,因此查找diskQueue
的函數的調用情況時應當查找backendQueue
中相應函數的調用。
diskQueue
的創建和初始化
diskQueue
的獲得是通過newDiskQueue
,該函數比較簡單,通過傳入的參數創建一個dispQueue
,
然後通過retrieveMetaData
diskQueue
相關聯的Topic/Channel已經持久化的信息。最後啟動ioLoop
循環處理消息。
retrieveMetaData
函數從磁盤中恢復diskQueue
的狀態。diskQueue
會定時將自己的狀態備份到文件中,
文件名由metaDataFileName
函數確定。retrieveMetaData
函數同樣通過metaDataFileName
函數獲得保存狀態的文件名並打開。
該文件只有三行,格式為%d\n%d,%d\n%d,%d\n
,第一行保存著該diskQueue
中消息的數量(depth
),
第二行保存readFileNum
和readPos
,第三行保存writeFileNum
writePos
。這裏不太理解的一個地方是d.depth
通過一個臨時變量去獲取然後通過atomic.StoreInt64
保存。個人覺得沒有必要這麽做。
當然作者在nsqd: diskqueue corruption and depth accounting這個Pull Request中也提到:
I dont believe that this should be strictly necessary because
retrieveMetaData
is only ever called inNewDiskQueue
and theioLoop
goroutine is launched after that call (which according to the go memory model is safe).However, I’m not 100% sure about interactions between the go memory model, go-routines, and the combined use of atomic and non-atomic operations (which is what this was relying on before this change… i.e. this was the only mutation of
d.depth
that was notusing atomic ops).
因此,這只是個比較保險的做法,並不一定意味著直接保存到d.depth
就不安全。
與retrieveMetaData
相對應的是persistMetaData
函數,這個函數將運行時的元數據保存到文件用於下次重新構建diskQueue
時的恢復。
邏輯基本與retrieveMetaData
,此處不再贅述。
diskQueue
的消息循環
ioLoop
函數實現了diskQueue
的消息循環,diskQueue
的定時操作和讀寫操作的核心都在這個函數中完成。
函數首先使用time.NewTicker(d.syncTimeout)
定義了syncTicker
變量,syncTicker
的類型是time.Ticker
,
每隔d.syncTimeout
時間就會在syncTicker.C
這個go channel產生一個消息。
通過select syncTicker.C
能實現至多d.syncTimeout
時間就跳出select塊一次,這種方式相當於一個延時的default
子句。
在ioLoop
中,通過這種方式,就能在一個goroutine中既實現消息的接收又實現定時任務(跳出select後執行定時任務,然後在進入select)。
有點類似於定時的輪詢。
ioLoop
的定時任務是調用sync
函數刷新文件,防止突然結束程序後內存中的內容未被提交到磁盤,導致內容丟失。
控制是否需要同步的變量是d.needSync
,該變量在一次sync
後會被置為false
,在許多需要刷新文件的地方會被置為true
。
在ioLoop
中,d.needSync
變量還跟刷新計數器count
變量有關,count
值的變化規則如下:
- 如果一次消息循環中,有寫入操作,那麽
count
就會被自增。 - 當
count
達到d.syncEvery
時,會將count
重置為0並且將d.needSync
置為true
,隨後進行文件的刷新。 - 在
emptyChan
收到消息時,count
會被重置為0,因為文件已經被刪除了,所有要重置刷新計數器。 - 在
syncTicker.C
收到消息後,會將count
重置為0,並且將d.needSync
置為true
。也就是至多d.syncTimeout
時間刷新一次文件。
ioLoop
還定時檢測當前是否有數據需要被讀取,如果(d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos)
和`d.nextReadPos == d.readPos
這兩個條件成立,則執行d.readOne()
並將結果放入dataRead
中,然後設置r
為d.readChan
。
如果條件不成立,則將r
置為空值nil
。隨後的select語句中有case r <- dataRead:
這樣一個分支,在註釋中作者寫了這是一個Golang的特性,
即:如果r
不為空,則會將dataRead
送入go channel。進入d.readChan
的消息通過ReadChan
函數向外暴露,最終被Topic/Channel的消息循環讀取。
而如果r
為空,則這個分支會被跳過。這個特性的使用統一了select的邏輯,簡化了當數據為空時的判斷。
diskQueue
的寫操作
寫操作的對外接口是Put
函數,該函數比較簡單,加鎖,並且將數據放入d.writeChan
,等待d.writeResponseChan
的結果後返回。 d.writeChan
的接收在ioLoop
中select的一個分支,處理時調用writeOne
函數,並將處理結果放入d.writeResponseChan
。
writeOne
函數是寫操作的最終執行部分,負責將消息寫入磁盤。函數邏輯比較簡單。消息寫入步驟如下:
- 若當前要寫的文件不存在,則通過
d.fileName(d.writeFileNum)
獲得文件名,並創建文件 - 根據
d.writePos
定位本次寫的位置 - 從要寫入的內容得到要寫入的長度
- 先寫入3中計算出的消息長度(4字節),然後寫入消息本身
- 將
d.writePos
後移4 + 消息長度
作為下次寫入位置。加4是因為消息長度本身也占4字節。 - 判斷
d.writePos
是否大於每個文件的最大字節數d.maxBytesPerFile
,如果是,則將d.writeFileNum
加1,
並重置d.writePos
。這個操作的目的是為了防止單個文件過大。 - 如果下次要寫入新的文件,那麽需要調用
sync
函數對當前文件進行同步。
diskQueue
的讀操作
消息讀取對外暴露的是一個go channel,而數據的最終來源是ioLoop
中調用的readOne
函數。readOne
函數邏輯跟writeOne
類似,
只是把寫操作換成了讀操作,唯一差異較大的地方是d.nextReadPos
和d.nextReadFileNum
這兩個變量的使用。
在寫操作時,如果寫入成功,則可以直接將寫入位置和寫入文件更新。但是對於讀操作來說,由於讀取的目的是為了向客戶端投遞,
因此無法保證一定能投遞成功。因此需要使用next開頭的兩個變量來保存成功後需要讀的位置,如果投遞沒有成功,
則繼續使用當前的讀取位置將再一次嘗試將消息投遞給客戶端。
當消息投遞成功後,則使用moveForward
函數將保存在d.nextReadPos
和d.nextReadFileNum
中的值取出,
賦值給d.readPos
和d.readFileNum
,moveForward
函數還負責清理已經讀完的舊文件。最後,調用checkTailCorruption
函數檢查文件是否有錯,
如果出現錯誤,則調用skipToNextRWFile
重置讀取和寫入的文件編號和位置。
diskQueue
中的其他函數
diskQueue
中還有與錯誤處理相關的handleReadError
,與關閉diskQueue
相關的Close
,Delete
,exit
,Empty
和deleteAllFiles
等,
函數,邏輯較簡單,不再專門分析。
diskQueue
總結
diskQueue
主要邏輯是對磁盤的讀寫操作,較為瑣碎但沒有復雜的架構。
其中消息循環的思路和讀寫過程周全的考慮都值得學習的。
nsq源碼閱讀筆記之nsqd(三)——diskQueue