Java讀取Level-1行情dbf檔案極致優化(2)
最近架構一個專案,實現行情的接入和分發,需要達到極致的低時延特性,這對於證券系統是非常重要的。接入的行情源是可以配置,既可以是Level-1,也可以是Level-2或其他第三方的源。雖然Level-1行情沒有Level-2快,但是作為系統支援的行情源,我們還是需要優化它,使得從檔案讀取,到使用者通過socket收到行情,端到端的時延儘可能的低。本文主要介紹對level-1行情dbf檔案讀取的極致優化方案。相信對其他的dbf檔案讀取應該也有借鑑意義。
Level-1行情是由行情小站,定時每隔幾秒把dbf檔案(上海是show2003.dbf,深圳是sjshq.dbf)更新一遍,用新的行情替換掉舊的。我們的目標就是,在新檔案完成更新後,在最短時間內將檔案讀取到記憶體,把每一行轉化為物件,把每個列轉化為對應的資料型別。
我們一共採用了6種優化方式。
優化一:採用記憶體硬碟(RamDisk)
優化二:採用JNotify,用通知替代輪詢
本文繼續介紹:
優化三:採用NIO讀取檔案
對於Dbf檔案的讀寫,有許多的開源的實現,選擇和改進它們是這裡的重要策略。
有許多Dbf庫是基於流的I/O實現的,即InputStream和OutStream。我們應該採用NIO的方式,即基於RandomAccessFile,FileChannel和ByteBuffer。流的方式是一邊處理資料,一邊從檔案中讀取,而採用NIO可以一次性把整個檔案載入到記憶體中。有測試表明(見《Java程式效能優化》一書),NIO的方式大概比流的方式快5倍左右。我這裡提供採用NIO實現的dbf讀取庫供大家下載學習(最原始的出處已不可考了。這個程式碼被改寫了,其中也已經包含我之後將要提出的優化策略),如果你的專案已經有dbf庫,建議基於本文的優化策略進行改進,而不是直接替換為我提供的。
其中,DBFReader.java中有如下程式碼片段:
建立FileChannel程式碼為:
this.dbf = new RandomAccessFile(file, "r"); this.fc = dbf.getChannel();
把指定的檔案片段載入到ByteBuffer的程式碼為
private ByteBuffer loadData(int offset, int length) throws IOException { // return fc.map(MapMode.READ_ONLY, offset, length).load(); ByteBuffer b = ByteBuffer.allocateDirect(length); fc.position(offset); fc.read(b); b.rewind();return b; }
以上,我們使用ByteBuffer.allocateDirect(length)建立ByteBuffer。 allocateDirect方法建立的是DirectBuffer,DirectBuffer分配在”核心快取區”,比普通的ByteBuffer快一倍,這也有利於我們程式的優化。但是DirectBuffer的建立和銷燬更耗時,在我們接下來的優化中將要解決這一問題。
(我不打算詳細介紹NIO的相關知識(可能我也講不清楚),也不打算詳細介紹DbfReader.java的程式碼,只重點講解和效能相關的部分,接下來也是如此。)
優化四:減少讀取檔案時記憶體反覆分配和GC
以上我提供的DBFReader.java檔案讀取的檔案的基本步驟是 :
1,把整個檔案(除了檔案頭)讀取到ByteBuffer當中(其實為DirectBuffer)
2,再把每一行從ByteBuffer讀取到一個個byte[]陣列中。
3,把這些byte[]陣列封裝在一個一個Record物件中(Record物件提供了從byte[]中讀取列的各種方法)。
見以下loadRecordsWithOutDel方法:
private List<Record> loadRecordsWithOutDel() throws IOException { ByteBuffer bb = loadData(getDataIndex(), getCount() * getRecordLength()); List<Record> rds = new ArrayList<Record>(getCount()); for (int i = 0; i < getCount(); i++) { bb.get(b); if ((char) b[0] != '*') { Record r = new Record(b); rds.add(r); } } bb.clear(); return rds; }
private ByteBuffer loadData(int offset, int length) throws IOException { // return fc.map(MapMode.READ_ONLY, offset, length).load(); fc.position(offset); fc.read(b); b.rewind(); return b; }
考慮到我們系統的實際應用的情況:行情dbf檔案每隔幾秒就會重新整理一遍,重新整理後的大小基本上差不多,格式是完全一樣的,每行的大小是一樣的。
注意看以上程式碼中高亮的部分,會反覆建立ByteBuffer和byte陣列。在我們的應用場景下,完全可以使用一種快取機制來重複使用他們,避免反覆建立。要知道一個行情檔案有5000多行之多,避免如此之多的new和GC,肯定對效能有好處。
我添加了一個CacheManager類來完成這個工作:
import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; public class CacheManager { private ByteBuffer byteBuffer = null; private int bufSize = 0; private List<byte[]> byteArrayList = null; private int bytesSize = 0; public CacheManager() { } public ByteBuffer getByteBuffer(int size) { if(this.bufSize < size) { byteBuffer = ByteBuffer.allocateDirect(size + 1024*8); //多分配一些,避免下次重新分配 this.bufSize = size + 1024*8; } byteBuffer.clear(); return byteBuffer; } public List<byte[]> getByteArrayList(int rowNum, int byteLength) //rowNum為行數,即需要的byte[]數量,byteLength為byte陣列的大小 { if(this.bytesSize!=byteLength) { byteArrayList = new ArrayList<byte[]>(); this.bytesSize = byteLength; } if(byteArrayList.size() < rowNum) { int shouldAddRowCount = rowNum - byteArrayList.size()+100; //多分配100行 for(int i=0; i<shouldAddRowCount; i++) { byteArrayList.add(new byte[bytesSize]); } } return byteArrayList; } }
CacheManager 管理了一個可以反覆使用的ByteBuffer,以及可以反覆使用的byte[]列表。
其中,getByteBuffer方法用於返回一個快取的ByteBuffer。其只有當快取的ByteBuffer小於指定的大小時,才重新建立ByteBuffer。(為了儘量避免這種情況,我們總是分配比實際需要大一些的ByteBuffer)。
其中,getByteArrayList方法用於返回快取的byte[]列表。其只有當需要的Byte[]數量小於需要的數量時,建立更多的byte[]; 如果快取的byte[]們的長度和需要的不符,就重新建立所有的byte[](這種情況不可能發生,因為每行的大小不會變,程式碼只是以防萬一而已)。
將loadRecordsWithOutDel改造為recordsWithOutDel_efficiently,採用快取機制:
public List<byte[]> recordsWithOutDel_efficiently(CacheManager cacheManager) throws IOException { ByteBuffer bb = fc.position(getDataIndex()); fc.read(bb); bb.rewind(); List<byte[]> rds = new ArrayList<byte[]>(getCount()); List<byte[]> byteArrayList =for (int i = 0; i < getCount(); i++) { byte[] b = byteArrayList.get(i); bb.get(b); if ((char) b[0] != '*') { rds.add(b); } } bb.clear(); return rds; }
在新的recordsWithOutDel_efficiently中,我們從CacheManager中分配快取的ByteBuffer和快取的byte[]。而不是從系統分配,從而減少了反覆的記憶體分配和GC。(另外,recordsWithOutDel_efficiently直接返回byte[]列表,而不是Record列表了)
我的測試發現,優化步驟四,即使用快取的方式,大概把時間從5ms左右降到了2ms多,提高大概一倍。
到此,我們只是完成了檔案到記憶體的讀取。接著是為每一行建立一個行情物件,從byte[]中把每一列資料讀取出來。 我發現,其耗時遠遠超過檔案讀取,在沒有優化的情況下,對5000多行資料的轉換超過70ms。這是我們接下來需要介紹的優化策略。
待續。。。