1. 程式人生 > >時間序列資料庫KDB 與Java結合使用介紹 -- 1 KDB Java程式碼解讀

時間序列資料庫KDB 與Java結合使用介紹 -- 1 KDB Java程式碼解讀

KDB是Kx System開發的時間序列資料庫,通常用於處理交易行情相關資料。具體介紹可以參考:https://en.wikipedia.org/wiki/Kdb%2B。

在我們的計價系統中使用kdb來儲存計價資料,由於KDB是基於Q語言的,我們的計價系統是Java寫的,所以使用了KDB提供的Java API。

KDB的Java實現在KDB原始碼有下載。其中最核心的是c.java,我們先來看看它的實現。

c.java的使用需要通過建立一個新的c物件,以c的建構函式為例:

public C(final String h, final int p, final String u, final boolean useTLS)
            throws KException, IOException {
        B = new byte[2 + C.ns(u)];
        s = new Socket(h, p);
        if (useTLS) {
            s = ((SSLSocketFactory) SSLSocketFactory.getDefault())
                    .createSocket(s, h, p, true);
            ((SSLSocket) s).startHandshake();
        }
        io(s);
        J = 0;
        w(u + "\3");
        o.write(B);
        if (1 != i.read(B, 0, 1)) {
            close();
            B = new byte[1 + C.ns(u)];
            io(new Socket(h, p));
            J = 0;
            w(u);
            o.write(B);
            if (1 != i.read(B, 0, 1)) {
                close();
                throw new KException("access");
            }
        }
        vt = Math.min(B[0], 3);
    }

它通過引數kdb host: h,kdb port: p,kdb user&pass: u 以及是否使用安全傳輸: useTLS和遠端kdb伺服器進行通訊,建立連線並且登陸,通常u是由username:password的形式傳輸。可以看到通過socket給kdb傳輸資料。
 void io(final Socket x) throws IOException {
        s = x;
        s.setTcpNoDelay(true);
        {
            final InetAddress a = s.getInetAddress();
            l = a.isAnyLocalAddress() || a.isLoopbackAddress();
        }
        i = new DataInputStream(s.getInputStream());
        o = s.getOutputStream();
        s.setKeepAlive(true);
    }

那麼一般使用什麼函式往kdb裡面插入資料呢?這裡有2種實現,同步(public Object k(String s) throws KException, IOException)和非同步(public void ks(String s) throws IOException),同步非同步還有過載方法直接寫入Object物件,我們系統寫入的是完整的計價資料,所以必須先自己打包成String。同步和非同步的實現差別就在於同步在往Socket寫入資料後會等待kdb伺服器返回結果。
      public void ks(final String s) throws IOException {
        w(0, cs(s));
    }
    public synchronized Object k(final Object x) throws KException, IOException {
        w(1, x);
        return k();
    }


    public Object k(final String s) throws KException, IOException {
        return k(cs(s));
    }
      protected void w(final int i, final Object x) throws IOException {
        final int n = nx(x) + 8;
        synchronized (o) {
            B = new byte[n];
            B[0] = 0;
            B[1] = (byte) i;
            J = 4;
            w(n);
            w(x);
            if (zip && J > 2000 && !l) {
                z();
            }
            o.write(B, 0, J);
        }
    }

這裡有一點非常值得注意,這些方法的執行是同步的!!!在最開始我們的實現中,使用了多執行緒往kdb寫入資料,而Connection我們只使用了一個C例項,這樣意義就不是很大了。在我們想對實現做修改的時候,和kdb服務端維護人員溝通,發現kdb預設的實現就是單執行緒寫入的,只支援多執行緒查詢,單執行緒寫入。所以我們改變多執行緒變成單執行緒,後面再說說相關實現細節。
single-thread which requires positive number of port
multi-thread which requires negative number of port
下一篇將給出一個簡單的寫使用案例。