1. 程式人生 > >一致性Hash演算法在資料庫分表中的實踐

一致性Hash演算法在資料庫分表中的實踐

最近有一個專案,其中某個功能單表資料在可預估的未來達到了億級,初步估算在90億左右。與同事詳細討論後,決定採用一致性Hash演算法來完成資料庫的自動擴容和資料遷移。整個程式細節由我同事完成,我只是將其理解併成文,供有相同問題的同行參考。

參看此文的兄弟,預設各位已經熟悉一致性hash演算法了。此文僅僅闡述程式碼細節,實現語言為Java

專案背景

  1. 專案是一個實驗室專案
  2. 其中有一個表叫做試驗表,用於儲存車型的試驗資料,每個試驗大概有6000條資料
  3. 總計初期約有2萬個車型,每個車型初期包含超過50個試驗。後期還會動態增長
  4. 試驗表中的資料僅需要根據車型試驗ID能取出來即可,沒有其他更復雜的業務邏輯

方案決策

專案正式上線初期,資料量不會直接爆發式增長到90億,需要時間上的積累(逐步做實驗),最終可能達到90億資料,甚至超過90億資料。

按照我們實際瞭解情況,oracle儲存資料量達到1千萬的時候,效能擅可。而Oracle官方的說法,如單表儲存1g有分割槽(大致500萬資料),查詢效率非常高。而試驗表中僅四個欄位,每條資料資料量較小。所以我們最終決定以1000萬為節點,水平拆表。當表資料達到1千萬時,即增加下一波表。進行資料自動遷移。

按照90億的總量,1000萬資料一個表的劃分,最終大致會產生900個左右的表。所以我們最終使用了4個數據庫。1個儲存其他業務模組的表,3個儲存此大資料表。每個資料庫大致有300張表。效能上和數量上都可達到我們的要求。

相關表結構

試驗資訊表(EXPERIMENT_MESSAGE),掛接車型和試驗的關係。試驗資料表(EXPERIMENT_DATA),儲存試驗資料

試驗資訊表:

欄位 含義
ID 主鍵,採用UUID生成
EXPERIMENT_ID 試驗表中的ID
CAR_ID 車型表中的ID
... 其餘數十個欄位省略

試驗資料表:

欄位 含義
ID 主鍵,採用UUID生成
EXPERIMENT_MESSAGE_ID 對應的實驗資訊id
X_VALUE 試驗資料X值
Y_VALUE 試驗資料Y值

我們採用作一致性hash的key,就是試驗資料表中的EXPERIMENT_MESSAGE_ID

欄位。也就是說,每個試驗資料表,不存則以,存則一次性大致有6000條資料。取同理。

一致性Hash演算法實現

一致性Hash演算法的hash部分,採用了著名的ketama演算法。在此,我們不多討論ketama演算法的細節,若各位有興趣,請查閱ketama演算法

    public long hash(String key) {
        if (md5 == null) {
            try {
                md5 = MessageDigest.getInstance("MD5");
            } catch (NoSuchAlgorithmException e) {
                throw new IllegalStateException("no md5 algorythm found");
            }
        }

        md5.reset();
        md5.update(key.getBytes());
        byte[] bKey = md5.digest();

        long res = ((long) (bKey[3] & 0xFF) << 24) | 
                         ((long) (bKey[2] & 0xFF) << 16) | 
                         ((long) (bKey[1] & 0xFF) << 8) | 
                         (long) (bKey[0] & 0xFF);
        return res & 0xffffffffL;
    }

有了Hash的演算法,接下來就要構造Hash環了。Hash環採用的SortedMap資料結構實現。

private final SortedMap<Long, T> circle = new TreeMap<Long, T>();

其中新增節點和移除節點部分,需要根據hash演算法得到節點在環上的位置,具體程式碼如下:

    /**
     * 新增虛擬節點
     * numberOfReplicas為虛擬節點的數量,初始化hash環的時候傳入,我們使用300個虛擬節點
     * @param node
     */
    public void add(T node) {
        for (int i = 0; i < numberOfReplicas; i++) {
            circle.put(hashFunction.hash(node.toString() + i), node);
        }
    }

    /**
     * 移除節點
     * @param node
     */
    public void remove(T node) {
        for (int i = 0; i < numberOfReplicas; i++) {
            circle.remove(hashFunction.hash(node.toString() + i));
        }
    }

而hash環中得到節點部分比較特殊,根據一致性hash演算法的介紹,得到hash環中的節點,實際上是計算出的hash值順時針找到的第一個節點。

     /**
     * 獲得一個最近的順時針節點
     * @param key 為給定鍵取Hash,取得順時針方向上最近的一個虛擬節點對應的實際節點
     * @return
     */
    public T get(Object key) {
        if (circle.isEmpty()) {
            return null;
        }
        long hash = hashFunction.hash((String) key);
        if (!circle.containsKey(hash)) {
            //返回此對映的部分檢視,其鍵大於等於 hash
            SortedMap<Long, T> tailMap = circle.tailMap(hash); 
            hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
        }
        return circle.get(hash);
    }

單表拆分實踐

上面完成了一致性hash演算法的實現,包含了hash演算法和hash環的實現。接下來就要處理具體業務中,如何使用這個hash環和演算法了。

我們業務中,主要操作這張表的資料,也就是增刪查。然後我們資料庫拆分成了3個,所以需要增刪查的操作基本一致,都是先通過一致性hash得到庫,再通過一致性hash得到表。

獲取資料庫名的操作如下,獲取到資料庫後,根據資料庫名到對應的連線池中獲取連線。

    /**
     * 根據試驗資訊id獲取其所在庫名
     * DatabaseType為我們資料的列舉
     * @return 資料庫的名稱
     **/
    private String getDataBase(String experimentMessageId) {
        //獲取資料來源
        DatabaseType[] databasetype = DatabaseType.values();
        List<String> dataBaselist = new ArrayList<>();
        Map<String, DatabaseType> map = new HashMap<>();

        for (DatabaseType d:databasetype) {
            if (!d.equals(DatabaseType.KC)) {
                dataBaselist.add(d.toString());
                map.put(d.toString(), d);
            }
        }
        //獲取資料來源hash
        ConsistentHash<String> dataBaseCon = getConsistentHash(dataBaselist);

        //獲取id所在資料來源
        String dataBase = dataBaseCon.get(experimentMessageId);
        return dataBase;
    }

獲取表名的操作如下,獲取到資料庫後,在對應的資料庫中找到需要的表,再從該表中查詢資料。

    /**
     * 根據試驗資訊id獲取其試驗資料所在表
     * @return
     **/
    public String getTableName(String experimentMessageId) {
        String dataBase = getDataBase(experimentMessageId);
        //查詢所有試驗資料表
        List<String> tables = experimentDataEODao.queryTbaleNames(dataBase, tableName);
        ConsistentHash<String> consistentHash = getConsistentHash(tables);
        String tableName = consistentHash.get(experimentMessageId);
        return tableName;
    }

剩下的增刪改操作和平常一致,在此不多贅述。

資料遷移實踐

一致性hash勢必涉及到資料遷移問題,我們採取的資料遷移方式為定時任務,針對每個資料庫在每天夜裡全量掃描一次。檢查是否有資料量超過1000萬的表,若存在這樣的表,就把現有的表數量double。 資料遷移只會在同庫之間遷移,不會涉及跨資料庫的情況。 此方案為初步方案,後續會改進的更加智慧,根據表的數量,增加不同數量的表。而不是簡單的把表數量翻倍。 表建立後,將需要遷移的表資料逐個遷移。

在連線到資料來源後,我們做了如下事情進行資料遷移 1.獲取庫中所有的表

 List<String> tables = getTables(connection, p, d.toString());

2.遍歷表,檢查表中資料是否超過邊界線(我們為1000萬)

 for (int i = 0; i < tables.size(); i++) {
    //查詢表內資料量
    int num = countByTableName(connection, p, tables.get(i));
    //finalNum為邊界值,此處為1000萬
    if (num > finalNum) {
        ……
    }
    ……
}

3.根據所有的表計算現有的虛擬節點

ConsistentHash<String> consistentHashOld = getConsistentHash(tables);

4.把表加倍

List<String> tablesNew = deepCopy(tables); //注意一定要採用深複製
int tableSize = tablesNew.size();
for (int y = 0; y < tableSize; y++) {
    String tableNameNew = tableName + (tablesNew.size() + 1);
    //建立表
    createTable(connection, p, d.toString(), tableNameNew);
    tablesNew.add(tableNameNew);
    tableDelete.add(tableNameNew);
}

5.計算加倍後的虛擬節點

ConsistentHash<String> consistentHashNew = getConsistentHash(tablesNew);

6.資料遷移

for (int z = 0; z < tableSize; z++) {
    String tableNameOld = tablesNew.get(z);
    //查詢試驗資訊id不重複的試驗資料資訊
    List<String> disData = selectExperimentIdDis(connection, p, tableNameOld);
    List<String> deleteList = new LinkedList<>();
    for (String experimentId : disData) {
        //如果資料hash計算 原所在表與新建表之後不一致,執行轉移
        if (!consistentHashNew.get(experimentId).equals(consistentHashOld.get(experimentId))) {

            //新增到新表資料
            insertHash(connection, p, experimentId, consistentHashOld.get(experimentId),
            consistentHashNew.get(experimentId));

            //刪除資料集合
            deleteList.add(experimentId);
            //刪除舊錶資料
            final int defaultDelNum = 1000;
            if (deleteList.size() == defaultDelNum) {
                deleteInbatch(connection, p, deleteList, tableNameOld);
                deleteList.clear();
            }
        }
    }

    //刪除舊錶資料
    if (deleteList.size() > 0) {
        deleteInbatch(connection, p, deleteList, tableNameOld);
    }
}

總結

以上為我們所做的一致性hash實踐,其中還存在很多問題,比如遷移過程單執行緒導致遷移較慢、自動擴容機制不智慧、遷移過程中資料訪問不穩定等情況。

我們將會在後續的開發中逐步進行完善改進。

以上就是我們針對一致性hash在oracle分表中的實踐

參考