1. 程式人生 > >Java讀取Level-1行情dbf檔案極致優化(2)

Java讀取Level-1行情dbf檔案極致優化(2)

最近架構一個專案,實現行情的接入和分發,需要達到極致的低時延特性,這對於證券系統是非常重要的。接入的行情源是可以配置,既可以是Level-1,也可以是Level-2或其他第三方的源。雖然Level-1行情沒有Level-2快,但是作為系統支援的行情源,我們還是需要優化它,使得從檔案讀取,到使用者通過socket收到行情,端到端的時延儘可能的低。本文主要介紹對level-1行情dbf檔案讀取的極致優化方案。相信對其他的dbf檔案讀取應該也有借鑑意義。

Level-1行情是由行情小站,定時每隔幾秒把dbf檔案(上海是show2003.dbf,深圳是sjshq.dbf)更新一遍,用新的行情替換掉舊的。我們的目標就是,在新檔案完成更新後,在最短時間內將檔案讀取到記憶體,把每一行轉化為物件,把每個列轉化為對應的資料型別。

我們一共採用了6種優化方式。

優化一:採用記憶體硬碟(RamDisk)

優化二:採用JNotify,用通知替代輪詢

本文繼續介紹:

優化三:採用NIO讀取檔案

對於Dbf檔案的讀寫,有許多的開源的實現,選擇和改進它們是這裡的重要策略。

有許多Dbf庫是基於流的I/O實現的,即InputStream和OutStream。我們應該採用NIO的方式,即基於RandomAccessFile,FileChannel和ByteBuffer。流的方式是一邊處理資料,一邊從檔案中讀取,而採用NIO可以一次性把整個檔案載入到記憶體中。有測試表明(見《Java程式效能優化》一書),NIO的方式大概比流的方式快5倍左右。我這裡提供採用NIO實現的dbf讀取庫供大家下載學習(最原始的出處已不可考了。這個程式碼被改寫了,其中也已經包含我之後將要提出的優化策略),如果你的專案已經有dbf庫,建議基於本文的優化策略進行改進,而不是直接替換為我提供的。

其中,DBFReader.java中有如下程式碼片段:

建立FileChannel程式碼為:

this.dbf = new RandomAccessFile(file, "r");
this.fc = dbf.getChannel();

把指定的檔案片段載入到ByteBuffer的程式碼為

private ByteBuffer loadData(int offset, int length) throws IOException {
        // return fc.map(MapMode.READ_ONLY, offset, length).load();
        ByteBuffer b = ByteBuffer.allocateDirect(length);
        fc.position(offset);
        fc.read(b);
        b.rewind();
        
return b; }

以上,我們使用ByteBuffer.allocateDirect(length)建立ByteBuffer。 allocateDirect方法建立的是DirectBuffer,DirectBuffer分配在”核心快取區”,比普通的ByteBuffer快一倍,這也有利於我們程式的優化。但是DirectBuffer的建立和銷燬更耗時,在我們接下來的優化中將要解決這一問題。

(我不打算詳細介紹NIO的相關知識(可能我也講不清楚),也不打算詳細介紹DbfReader.java的程式碼,只重點講解和效能相關的部分,接下來也是如此。)

優化四:減少讀取檔案時記憶體反覆分配和GC

以上我提供的DBFReader.java檔案讀取的檔案的基本步驟是 :

1,把整個檔案(除了檔案頭)讀取到ByteBuffer當中(其實為DirectBuffer)

2,再把每一行從ByteBuffer讀取到一個個byte[]陣列中。

3,把這些byte[]陣列封裝在一個一個Record物件中(Record物件提供了從byte[]中讀取列的各種方法)。

見以下loadRecordsWithOutDel方法:

private List<Record> loadRecordsWithOutDel() throws IOException {

        ByteBuffer bb = loadData(getDataIndex(), getCount() * getRecordLength());

        List<Record> rds = new ArrayList<Record>(getCount());
        for (int i = 0; i < getCount(); i++) {
            
            bb.get(b);

            if ((char) b[0] != '*') {
                Record r = new Record(b);
                rds.add(r);
            }
        }

        bb.clear();

        return rds;
    }
private ByteBuffer loadData(int offset, int length) throws IOException {
        // return fc.map(MapMode.READ_ONLY, offset, length).load();
        
        fc.position(offset);
        fc.read(b);
        b.rewind();
        return b;

    }

考慮到我們系統的實際應用的情況:行情dbf檔案每隔幾秒就會重新整理一遍,重新整理後的大小基本上差不多,格式是完全一樣的,每行的大小是一樣的。

注意看以上程式碼中高亮的部分,會反覆建立ByteBuffer和byte陣列。在我們的應用場景下,完全可以使用一種快取機制來重複使用他們,避免反覆建立。要知道一個行情檔案有5000多行之多,避免如此之多的new和GC,肯定對效能有好處。

我添加了一個CacheManager類來完成這個工作:

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

public class CacheManager {
    
    private ByteBuffer byteBuffer = null;
    private int bufSize = 0;
    
    private List<byte[]> byteArrayList = null;
    private int bytesSize = 0;
    
    public CacheManager()
    {
    }
    
    public ByteBuffer getByteBuffer(int size)
    {
        if(this.bufSize < size)
        {
            byteBuffer = ByteBuffer.allocateDirect(size + 1024*8); //多分配一些,避免下次重新分配
            this.bufSize = size + 1024*8;
        }
        byteBuffer.clear();
        return byteBuffer;
    }
    
    public List<byte[]> getByteArrayList(int rowNum, int byteLength) //rowNum為行數,即需要的byte[]數量,byteLength為byte陣列的大小 
    {
        if(this.bytesSize!=byteLength)
        {
            byteArrayList = new ArrayList<byte[]>();
            this.bytesSize = byteLength;
        }
        
        if(byteArrayList.size() < rowNum)
        {
            int shouldAddRowCount = rowNum - byteArrayList.size()+100; //多分配100行
            for(int i=0; i<shouldAddRowCount; i++) 
            {
                byteArrayList.add(new byte[bytesSize]);
            }
        }
        
        return byteArrayList;
    }
    
}

CacheManager 管理了一個可以反覆使用的ByteBuffer,以及可以反覆使用的byte[]列表。

其中,getByteBuffer方法用於返回一個快取的ByteBuffer。其只有當快取的ByteBuffer小於指定的大小時,才重新建立ByteBuffer。(為了儘量避免這種情況,我們總是分配比實際需要大一些的ByteBuffer)。

其中,getByteArrayList方法用於返回快取的byte[]列表。其只有當需要的Byte[]數量小於需要的數量時,建立更多的byte[]; 如果快取的byte[]們的長度和需要的不符,就重新建立所有的byte[](這種情況不可能發生,因為每行的大小不會變,程式碼只是以防萬一而已)。

將loadRecordsWithOutDel改造為recordsWithOutDel_efficiently,採用快取機制:

public List<byte[]> recordsWithOutDel_efficiently(CacheManager cacheManager) throws IOException {

        ByteBuffer bb = 
        fc.position(getDataIndex());
        fc.read(bb);
        bb.rewind();
        List<byte[]> rds = new ArrayList<byte[]>(getCount());
        List<byte[]> byteArrayList =for (int i = 0; i < getCount(); i++) {
            byte[] b = byteArrayList.get(i);
            bb.get(b);

            if ((char) b[0] != '*') {
                rds.add(b);
            }
        }

        bb.clear();
        return rds;
    }

在新的recordsWithOutDel_efficiently中,我們從CacheManager中分配快取的ByteBuffer和快取的byte[]。而不是從系統分配,從而減少了反覆的記憶體分配和GC。(另外,recordsWithOutDel_efficiently直接返回byte[]列表,而不是Record列表了)

我的測試發現,優化步驟四,即使用快取的方式,大概把時間從5ms左右降到了2ms多,提高大概一倍。

到此,我們只是完成了檔案到記憶體的讀取。接著是為每一行建立一個行情物件,從byte[]中把每一列資料讀取出來。  我發現,其耗時遠遠超過檔案讀取,在沒有優化的情況下,對5000多行資料的轉換超過70ms。這是我們接下來需要介紹的優化策略。

待續。。。