Zookeeper原始碼分析之持久化(一)
阿新 • • 發佈:2019-02-15
一、前言
持久化對於資料的儲存至關重要,下面進行詳細分析。二、持久化總體框架
持久化的類主要在包org.apache.zookeeper.server.persistence下,此次也主要是對其下的類進行分析,其包下總體的類結構如下圖所示。
· TxnLog,介面型別,讀取事務性日誌的介面。
· FileTxnLog,實現TxnLog介面,添加了訪問該事務性日誌的API。
· Snapshot,介面型別,持久層快照介面。
· FileSnap,實現Snapshot介面,負責儲存、序列化、反序列化、訪問快照。
· FileTxnSnapLog,封裝了TxnLog和SnapShot。
· Util,工具類,提供持久化所需的API。
下面先來分析TxnLog和FileTxnLog的原始碼。
三、TxnLog原始碼分析
TxnLog是介面,規定了對日誌的響應操作。其中,TxnLog除了提供讀寫事務日誌的API外,還提供了一個用於讀取日誌的迭代器介面TxnIterator。public interface TxnLog { /** * roll the current * log being appended to * @throws IOException */ // 回滾日誌 void rollLog() throws IOException; /** * Append a request to the transaction log * @param hdr the transaction header * @param r the transaction itself * returns true iff something appended, otw false * @throws IOException */ // 新增一個請求至事務性日誌 boolean append(TxnHeader hdr, Record r) throws IOException; /** * Start reading the transaction logs * from a given zxid * @param zxid * @return returns an iterator to read the * next transaction in the logs. * @throws IOException */ // 讀取事務性日誌 TxnIterator read(long zxid) throws IOException; /** * the last zxid of the logged transactions. * @return the last zxid of the logged transactions. * @throws IOException */ // 事務性操作的最新zxid long getLastLoggedZxid() throws IOException; /** * truncate the log to get in sync with the * leader. * @param zxid the zxid to truncate at. * @throws IOException */ // 清空日誌,與Leader保持同步 boolean truncate(long zxid) throws IOException; /** * the dbid for this transaction log. * @return the dbid for this transaction log. * @throws IOException */ // 獲取資料庫的id long getDbId() throws IOException; /** * commmit the trasaction and make sure * they are persisted * @throws IOException */ // 提交事務並進行確認 void commit() throws IOException; /** * close the transactions logs */ // 關閉事務性日誌 void close() throws IOException; /** * an iterating interface for reading * transaction logs. */ // 讀取事務日誌的迭代器介面 public interface TxnIterator { /** * return the transaction header. * @return return the transaction header. */ // 獲取事務頭部 TxnHeader getHeader(); /** * return the transaction record. * @return return the transaction record. */ // 獲取事務 Record getTxn(); /** * go to the next transaction record. * @throws IOException */ // 下個事務 boolean next() throws IOException; /** * close files and release the * resources * @throws IOException */ // 關閉檔案釋放資源 void close() throws IOException; } }
四、FileTxnLog原始碼分析
對於LogFile而言,其格式可分為如下三部分LogFile:
FileHeader TxnList ZeroPad
FileHeader格式如下
FileHeader: {
magic 4bytes (ZKLG)
version 4bytes
dbid 8bytes
}
TxnList格式如下
TxnList:
Txn || Txn TxnList
Txn格式如下
Txn:
checksum Txnlen TxnHeader Record 0x42
Txnlen格式如下
Txnlen:
len 4bytes
TxnHeader格式如下
TxnHeader: {
sessionid 8bytes
cxid 4bytes
zxid 8bytes
time 8bytes
type 4bytes
}
ZeroPad格式如下
ZeroPad:
0 padded to EOF (filled during preallocation stage)
瞭解LogFile的格式對於理解原始碼會有很大的幫助。
4.1 屬性
4.2. 核心函式public class FileTxnLog implements TxnLog { private static final Logger LOG; // 預分配大小 64M static long preAllocSize = 65536 * 1024; // 魔術數字,預設為1514884167 public final static int TXNLOG_MAGIC = ByteBuffer.wrap("ZKLG".getBytes()).getInt(); // 版本號 public final static int VERSION = 2; /** Maximum time we allow for elapsed fsync before WARNing */ // 進行同步時,發出warn之前所能等待的最長時間 private final static long fsyncWarningThresholdMS; // 靜態屬性,確定Logger、預分配空間大小和最長時間 static { LOG = LoggerFactory.getLogger(FileTxnLog.class); String size = System.getProperty("zookeeper.preAllocSize"); if (size != null) { try { preAllocSize = Long.parseLong(size) * 1024; } catch (NumberFormatException e) { LOG.warn(size + " is not a valid value for preAllocSize"); } } fsyncWarningThresholdMS = Long.getLong("fsync.warningthresholdms", 1000); } // 最大(新)的zxid long lastZxidSeen; // 儲存資料相關的流 volatile BufferedOutputStream logStream = null; volatile OutputArchive oa; volatile FileOutputStream fos = null; // log目錄檔案 File logDir; // 是否強制同步 private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");; // 資料庫id long dbId; // 流列表 private LinkedList<FileOutputStream> streamsToFlush = new LinkedList<FileOutputStream>(); // 當前大小 long currentSize; // 寫日誌檔案 File logFileWrite = null; }
1. append函式
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
if (hdr != null) { // 事務頭部不為空
if (hdr.getZxid() <= lastZxidSeen) { // 事務的zxid小於等於最後的zxid
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
}
if (logStream==null) { // 日誌流為空
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: log." +
Long.toHexString(hdr.getZxid()));
}
//
logFileWrite = new File(logDir, ("log." +
Long.toHexString(hdr.getZxid())));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
//
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
// 序列化
fhdr.serialize(oa, "fileheader");
// Make sure that the magic number is written before padding.
// 重新整理到磁碟
logStream.flush();
// 當前通道的大小
currentSize = fos.getChannel().position();
// 新增fos
streamsToFlush.add(fos);
}
// 填充檔案
padFile(fos);
// Serializes transaction header and transaction data into a byte buffer.
// 將事務頭和事務資料序列化成Byte Buffer
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) { // 為空,丟擲異常
throw new IOException("Faulty serialization for header " +
"and txn");
}
// 生成一個驗證演算法
Checksum crc = makeChecksumAlgorithm();
// Updates the current checksum with the specified array of bytes
// 使用Byte陣列來更新當前的Checksum
crc.update(buf, 0, buf.length);
// 寫long型別資料
oa.writeLong(crc.getValue(), "txnEntryCRC");
// Write the serialized transaction record to the output archive.
// 將序列化的事務記錄寫入OutputArchive
Util.writeTxnBytes(oa, buf);
return true;
}
return false;
}
說明:append函式主要用做向事務日誌中新增一個條目,其大體步驟如下① 檢查TxnHeader是否為空,若不為空,則進入②,否則,直接返回false
② 檢查logStream是否為空(初始化為空),若不為空,則進入③,否則,進入⑤
③ 初始化寫資料相關的流和FileHeader,並序列化FileHeader至指定檔案,進入④
④ 強制重新整理(保證資料存到磁碟),並獲取當前寫入資料的大小。進入⑤
⑤ 填充資料,填充0,進入⑥
⑥ 將事務頭和事務序列化成ByteBuffer(使用Util.marshallTxnEntry函式),進入⑦
⑦ 使用Checksum演算法更新步驟⑥的ByteBuffer。進入⑧
⑧ 將更新的ByteBuffer寫入磁碟檔案,返回true
append間接呼叫了padLog函式,其原始碼如下
public static long padLogFile(FileOutputStream f,long currentSize,
long preAllocSize) throws IOException{
// 獲取位置
long position = f.getChannel().position();
if (position + 4096 >= currentSize) { // 計算後是否大於當前大小
// 重新設定當前大小,剩餘部分填充0
currentSize = currentSize + preAllocSize;
fill.position(0);
f.getChannel().write(fill, currentSize-fill.remaining());
}
return currentSize;
}
說明:其主要作用是當檔案大小不滿64MB時,向檔案填充0以達到64MB大小。2. getLogFiles函式
public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
// 按照zxid對檔案進行排序
List<File> files = Util.sortDataDir(logDirList, "log", true);
long logZxid = 0;
// Find the log file that starts before or at the same time as the
// zxid of the snapshot
for (File f : files) { // 遍歷檔案
// 從檔案中獲取zxid
long fzxid = Util.getZxidFromName(f.getName(), "log");
if (fzxid > snapshotZxid) { // 跳過大於snapshotZxid的檔案
continue;
}
// the files
// are sorted with zxid's
if (fzxid > logZxid) { // 找出檔案中最大的zxid(同時還需要小於等於snapshotZxid)
logZxid = fzxid;
}
}
// 檔案列表
List<File> v=new ArrayList<File>(5);
for (File f : files) { // 再次遍歷檔案
// 從檔案中獲取zxid
long fzxid = Util.getZxidFromName(f.getName(), "log");
if (fzxid < logZxid) { // 跳過小於logZxid的檔案
continue;
}
// 新增
v.add(f);
}
// 轉化成File[] 型別後返回
return v.toArray(new File[0]);
}
說明:該函式的作用是找出剛剛小於或者等於snapshot的所有log檔案。其步驟大致如下。① 對所有log檔案按照zxid進行升序排序,進入②
② 遍歷所有log檔案並記錄剛剛小於或等於給定snapshotZxid的log檔案的logZxid,進入③
③ 再次遍歷log檔案,新增zxid大於等於步驟②中的logZxid的所有log檔案,進入④
④ 轉化後返回
getLogFiles函式呼叫了sortDataDir,其原始碼如下:
public static List<File> sortDataDir(File[] files, String prefix, boolean ascending)
{
if(files==null)
return new ArrayList<File>(0);
// 轉化為列表
List<File> filelist = Arrays.asList(files);
// 進行排序,Comparator是關鍵,根據zxid進行排序
Collections.sort(filelist, new DataDirFileComparator(prefix, ascending));
return filelist;
}
說明:其用於排序log檔案,可以選擇根據zxid進行升序或降序。getLogFiles函式間接呼叫了getZxidFromName,其原始碼如下:
// 從檔名中解析出zxid
public static long getZxidFromName(String name, String prefix) {
long zxid = -1;
// 對檔名進行分割
String nameParts[] = name.split("\\.");
if (nameParts.length == 2 && nameParts[0].equals(prefix)) { // 字首相同
try {
// 轉化成長整形
zxid = Long.parseLong(nameParts[1], 16);
} catch (NumberFormatException e) {
}
}
return zxid;
}
說明:getZxidFromName主要用作從檔名中解析zxid,並且需要從指定的字首開始。3. getLastLoggedZxid函式
public long getLastLoggedZxid() {
// 獲取已排好序的所有的log檔案
File[] files = getLogFiles(logDir.listFiles(), 0);
// 獲取最大的zxid(最後一個log檔案對應的zxid)
long maxLog=files.length>0?
Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;
// if a log file is more recent we must scan it to find
// the highest zxid
//
long zxid = maxLog;
// 迭代器
TxnIterator itr = null;
try {
// 新生FileTxnLog
FileTxnLog txn = new FileTxnLog(logDir);
// 開始讀取從給定zxid之後的所有事務
itr = txn.read(maxLog);
while (true) { // 遍歷
if(!itr.next()) // 是否存在下一項
break;
// 獲取事務頭
TxnHeader hdr = itr.getHeader();
// 獲取zxid
zxid = hdr.getZxid();
}
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
} finally {
// 關閉迭代器
close(itr);
}
return zxid;
}
說明:該函式主要用於獲取記錄在log中的最後一個zxid。其步驟大致如下① 獲取已排好序的所有log檔案,並從最後一個檔案中取出zxid作為候選的最大zxid,進入②
② 新生成FileTxnLog並讀取步驟①中zxid之後的所有事務,進入③
③ 遍歷所有事務並提取出相應的zxid,最後返回。
其中getLastLoggedZxid呼叫了read函式,其原始碼如下
public TxnIterator read(long zxid) throws IOException {
// 返回事務檔案訪問迭代器
return new FileTxnIterator(logDir, zxid);
}
說明:read函式會生成一個FileTxnIterator,其是TxnLog.TxnIterator的子類,之後在FileTxnIterator建構函式中會呼叫init函式,其原始碼如下 void init() throws IOException {
// 新生成檔案列表
storedFiles = new ArrayList<File>();
// 進行排序
List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);
for (File f: files) { // 遍歷檔案
if (Util.getZxidFromName(f.getName(), "log") >= zxid) { // 新增zxid大於等於指定zxid的檔案
storedFiles.add(f);
}
// add the last logfile that is less than the zxid
else if (Util.getZxidFromName(f.getName(), "log") < zxid) { // 只新增一個zxid小於指定zxid的檔案,然後退出
storedFiles.add(f);
break;
}
}
// go to the next logfile
// 進入下一個log檔案
goToNextLog();
if (!next()) // 不存在下一項,返回
return;
while (hdr.getZxid() < zxid) { // 從事務頭中獲取zxid小於給定zxid,直到不存在下一項或者大於給定zxid時退出
if (!next())
return;
}
}
說明:init函式用於進行初始化操作,會根據zxid的不同進行不同的初始化操作,在init函式中會呼叫goToNextLog函式,其原始碼如下 private boolean goToNextLog() throws IOException {
if (storedFiles.size() > 0) { // 儲存的檔案列表大於0
// 取最後一個log檔案
this.logFile = storedFiles.remove(storedFiles.size()-1);
// 針對該檔案,建立InputArchive
ia = createInputArchive(this.logFile);
// 返回true
return true;
}
return false;
}
說明:goToNextLog表示選取下一個log檔案,在init函式中還呼叫了next函式,其原始碼如下 public boolean next() throws IOException {
if (ia == null) { // 為空,返回false
return false;
}
try {
// 讀取長整形crcValue
long crcValue = ia.readLong("crcvalue");
// 通過input archive讀取一個事務條目
byte[] bytes = Util.readTxnBytes(ia);
// Since we preallocate, we define EOF to be an
if (bytes == null || bytes.length==0) { // 對bytes進行判斷
throw new EOFException("Failed to read " + logFile);
}
// EOF or corrupted record
// validate CRC
// 驗證CRC
Checksum crc = makeChecksumAlgorithm();
// 更新
crc.update(bytes, 0, bytes.length);
if (crcValue != crc.getValue()) // 驗證不相等,丟擲異常
throw new IOException(CRC_ERROR);
if (bytes == null || bytes.length == 0) // bytes為空,返回false
return false;
// 新生成TxnHeader
hdr = new TxnHeader();
// 將Txn反序列化,並且將對應的TxnHeader反序列化至hdr,整個Record反序列化至record
record = SerializeUtils.deserializeTxn(bytes, hdr);
} catch (EOFException e) { // 丟擲異常
LOG.debug("EOF excepton " + e);
// 關閉輸入流
inputStream.close();
// 賦值為null
inputStream = null;
ia = null;
hdr = null;
// this means that the file has ended
// we should go to the next file
if (!goToNextLog()) { // 沒有log檔案,則返回false
return false;
}
// if we went to the next log file, we should call next() again
// 繼續呼叫next
return next();
} catch (IOException e) {
inputStream.close();
throw e;
}
// 返回true
return true;
}
說明:next表示將迭代器移動至下一個事務,方便讀取,next函式的步驟如下。① 讀取事務的crcValue值,用於後續的驗證,進入②
② 讀取事務,使用CRC32進行更新並與①中的結果進行比對,若不相同,則丟擲異常,否則,進入③
③ 將事務進行反序列化並儲存至相應的屬性中(如事務頭和事務體),會確定具體的事務操作型別。
④ 在讀取過程丟擲異常時,會首先關閉流,然後再嘗試呼叫next函式(即進入下一個事務進行讀取)。
4. commit函式
public synchronized void commit() throws IOException {
if (logStream != null) {
// 強制刷到磁碟
logStream.flush();
}
for (FileOutputStream log : streamsToFlush) { // 遍歷流
// 強制刷到磁碟
log.flush();
if (forceSync) { // 是否強制同步
long startSyncNS = System.nanoTime();
log.getChannel().force(false);
// 計算流式的時間
long syncElapsedMS =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
if (syncElapsedMS > fsyncWarningThresholdMS) { // 大於閾值時則會警告
LOG.warn("fsync-ing the write ahead log in "
+ Thread.currentThread().getName()
+ " took " + syncElapsedMS
+ "ms which will adversely effect operation latency. "
+ "See the ZooKeeper troubleshooting guide");
}
}
}
while (streamsToFlush.size() > 1) { // 移除流並關閉
streamsToFlush.removeFirst().close();
}
}
說明:該函式主要用於提交事務日誌至磁碟,其大致步驟如下① 若日誌流logStream不為空,則強制重新整理至磁碟,進入②
② 遍歷需要重新整理至磁碟的所有流streamsToFlush並進行重新整理,進入③
③ 判斷是否需要強制性同步,如是,則計算每個流的流式時間並在控制檯給出警告,進入④
④ 移除所有流並關閉。
5. truncate函式
public boolean truncate(long zxid) throws IOException {
FileTxnIterator itr = null;
try {
// 獲取迭代器
itr = new FileTxnIterator(this.logDir, zxid);
PositionInputStream input = itr.inputStream;
long pos = input.getPosition();
// now, truncate at the current position
// 從當前位置開始清空
RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
raf.setLength(pos);
raf.close();
while (itr.goToNextLog()) { // 存在下一個log檔案
if (!itr.logFile.delete()) { // 刪除
LOG.warn("Unable to truncate {}", itr.logFile);
}
}
} finally {
// 關閉迭代器
close(itr);
}
return true;
}
說明:該函式用於清空大於給定zxid的所有事務日誌。