1. 程式人生 > 實用技巧 >圖解JanusGraph系列 - 關於JanusGraph圖資料批量快速匯入的方案和想法(bulk load data)

圖解JanusGraph系列 - 關於JanusGraph圖資料批量快速匯入的方案和想法(bulk load data)

大家好,我是洋仔,JanusGraph圖解系列文章,實時更新~

圖資料庫文章總目錄:

原始碼分析相關可檢視github碼文不易,求個star~): https://github.com/YYDreamer/janusgraph

版本:JanusGraph-0.5.2

轉載文章請保留以下宣告:

作者:洋仔聊程式設計

微信公眾號:匠心Java

原文地址:https://liyangyang.blog.csdn.net/

前言

JanusGraph的批量匯入速度一直是使用者使用的痛點, 下面會依託官網的介紹和個人理解,聊一下關於圖資料批量快速匯入的一些方案、方案使用場景和一些想;

寫這篇文章的目的主要是為了讓大家瞭解一下janus的匯入的一些常用方案,算是一個總結吧,如有疑問或者文章錯誤,歡迎留言聯絡我

首先,說一下JanusGraph的批量匯入的可配置的優化配置選項 和 基於第三方儲存和索引的優化配置選項:

  • 批量匯入的配置選項
  • 第三方儲存後端的優化選項(Hbase為例)
  • 第三方索引後端的優化選項(ES為例)

之後分析一下資料匯入的四個方案:

  • 基於JanusGraph Api的批量匯入
  • 基於Gremlin Server的批量匯入
  • 使用JanusGraph-utils的批量匯入
  • 基於bulk loader 匯入方式
  • 基於抽取序列化邏輯生成Hfile離線批量匯入

最後聊一下關於批量匯入的一些想法;

一:批量匯入的優化配置選項

1、批量匯入的配置選項

JanusGraph中有許多配置選項和工具可以將大量的圖資料更有效地匯入。這種匯入稱為批量載入,與預設的事務性載入相反,預設的事務性載入單個事務只會新增少量資料。

下述介紹了配置選項和工具,這些工具和工具使JanusGraph中的批量載入更加高效。

在繼續操作之前,請仔細遵守每個選項的限制和假設,以免丟失資料或損壞資料。

配置選項

janusgraph支援批量匯入,可通過相關配置項設定

下面具體看一下對應配置項的詳細作用:

批量載入

1) 配置項:storage.batch-loading

啟用該配置項,相當於打開了JanusGraph的批量匯入開關;

影響:

啟用批處理載入會在許多地方禁用JanusGraph內部一致性檢查,重要的是會禁用lock鎖定來保證分散式一致性;JanusGraph假定要載入到JanusGraph中的資料與圖形一致,因此出於效能考慮禁用了自己的一致性檢查。

換句話說,我們要在匯入資料之前,保證要匯入的資料和圖中已有的資料不會產生衝突也就是保持一致性!

為什麼要禁用一致性檢查來提升效能?

在許多批量載入方案中,在載入資料之前確保去資料一致性,然後在將資料載入到資料庫比在載入資料到相簿時確保資料一致性,消耗的成本要便宜的多。

例如,將現有使用者資料檔案批量載入到JanusGraph中的用例:假設使用者名稱屬性鍵具有定義的唯一複合索引,即使用者名稱在整個圖中必須是唯一的。

那麼按名稱對資料檔案進行排序並過濾出重複項或編寫執行此類過濾的Hadoop作業消耗的時間成本,就會比開啟一致性檢查在匯入圖資料時janusgraph檢查花費的成本要少的多。

基於上述,我們可以啟用 storage.batch-loading配置,從而大大減少了批量載入時間,因為JanusGraph不必檢查每個新增的使用者該名稱是否已存在於資料庫中。

重要提示

啟用storage.batch-loading要求使用者確保載入的資料在內部是一致的,並且與圖中已存在的任何資料一致。

特別是,啟用批處理載入時,併發型別建立會導致嚴重的資料完整性問題。因此,我們強烈建議通過schema.default = none在圖形配置中進行設定來禁用自動型別建立。

優化ID分配

1、ID塊大小

1)配置項:ids.block-size

該配置項為配置在分散式id的生成過程中每次獲取 id block的大小;

分散式id相關具體可看文章《圖解Janusgraph系列-分散式id生成策略分析》

原理:

每個新新增的頂點或邊都分配有唯一的ID。JanusGraph的ID池管理器以block的形式獲取特定JanusGraph例項的ID。id塊獲取過程很昂貴,因為它需要保證塊的全域性唯一分配。

增加 ids.block-size會減少獲取次數,但可能會使許多ID未被分配,從而造成浪費。對於事務性工作負載,預設塊大小是合理的,但是在批量載入期間,頂點和邊的新增要頻繁得多,而且要快速連續。

因此,通常建議將塊大小增加10倍或更多,具體取決於每臺機器要新增的頂點數量。

經驗法則

設定ids.block-size為您希望每小時為每個JanusGraph例項新增的頂點數。

重要提示:

必須為所有JanusGraph例項配置相同的值,ids.block-size以確保正確的ID分配。因此,在更改此值之前,請務必關閉所有JanusGraph例項

2、ID Acquisition流程

當許多JanusGraph例項頻繁並行分配id塊時,不可避免地會出現例項之間的分配衝突,從而減慢了分配過程。

此外,由於大容量載入而導致的增加的寫負載可能會使該過程進一步減慢到JanusGraph認為失敗並引發異常的程度。可以調整2個配置選項來避免這種情況;

1)配置項:ids.authority.wait-time

配置ID池管理器等待應用程式獲取ID塊被儲存後端確認的時間(以毫秒為單位)。這段時間越短,應用程式在擁擠的儲存群集上發生故障的可能性就越大。

經驗法則

將其設定為負載下儲存後端叢集上測量的第95百分位讀寫時間的總和。

重要說明

所有JanusGraph例項的該值都應該相同。

2)配置項:ids.renew-timeout

配置JanusGraph的ID池管理器在嘗試獲取新的ID塊總共等待的毫秒數。

經驗法則

將此值設定為儘可能大,不必為不可恢復的故障等待太久。增加它的唯一缺點是JanusGraph將在不可用的儲存後端群集上嘗試更長的時間

優化讀寫

1、緩衝區大小

JanusGraph在資料匯入時存在一個緩衝區,用來緩衝當前事務的部分請求,從而可以小批量的寫入和執行,從而減少針對儲存後端的請求數。在短時間內執行大量寫操作時,儲存後端可能會因為大量的寫請求打入而變得超負荷;

配置項:storage.buffer-size

這些批次的大小由storage.buffer-size來控制。 增加storage.buffer-size可以通過增加緩衝區大小,來使得批次儲存更多的請求,從而減少寫請求的次數來避免上述失敗。

注意:

增加緩衝區大小會增加寫請求的等待時間及其失敗的可能性。因此,不建議為事務性負載增加此設定,並且應該在批量載入期間仔細嘗試此設定的一個合適的值。

2、讀寫健壯性

在批量載入期間,群集上的負載通常會增加,從而使讀和寫操作失敗的可能性更大(尤其是如上所述,如果緩衝區大小增加了)。

1)配置項:storage.read-attempts

該配置項配置JanusGraph在放棄之前嘗試對儲存後端執行讀取或寫入操作的次數。

2)配置項:storage.attempt-wait

改配置項指定JanusGraph在重新嘗試失敗的後端操作之前將等待的毫秒數。較高的值可以確保重試操作不會進一步增加後端的負載。

注意:

如果在批量載入期間後端上可能會有很高的負載,通常建議增加這些配置選項。

其他

1)配置項:storage.read-attempts

2、第三方儲存後端的優化選項

針對於第三方儲存的優化分為兩部分:

  • 第三方儲存叢集自身的優化
  • JanusGraph結合第三方儲存的優化選項

叢集自身的優化

叢集自身的優化,本文主要介紹janusgraph相關優化這裡就不多說這部分了,主要是提升hbase叢集的讀寫能力;

這裡主要還是關注的Hbase的寫資料能力優化後的提升!這部分的優化至關重要! 下面舉幾個例子:

1)配置項: hbase.client.write.buffer

設定buffer的容量

HBase Client會在資料累積到設定的閾值後才提交Region Server。這樣做的好處在於可以減少RPC連線次數。

計算一下服務端因此而消耗的記憶體:hbase.client.write.buffer * hbase.regionserver.handler.count從在減少PRC次數和增加伺服器端記憶體之間找到平衡點。

2)配置項: hbase.regionserver.handler.count

定義每個Region Server上的RPC Handler的數量

Region Server通過RPC Handler接收外部請求並加以處理。所以提升RPC Handler的數量可以一定程度上提高HBase接收請求的能力。

當然,handler數量也不是越大越好,這要取決於節點的硬體情況。

等等各種配置項

3)針對一些CF、RowKey設計之類的優化點,因為這些都是janus預設好的,所以在janusGraph中使用不到;

JanusGraph針對優化

針對於JanusGraph+第三方儲存的優化,官網(配置項文件超連結) 給出了一些配置選項,可從其找出對應的配置項;

針對於hbase,我在配置項中找出了對應的一些可能有作用的配置如下:

1)配置項: storage.hbase.compression-algorithm

hbase儲存資料壓縮演算法的配置,我們在《圖解相簿JanusGraph系列-一文知曉“圖資料“底層儲存結構》文章中提到有好幾個地方都是壓縮儲存的,此處就是配置的壓縮演算法;

型別: 列舉值,支援 lzogzsnappylz4bzip2zstd五種壓縮演算法 和 不壓縮配置:none

預設值: gz壓縮;

注意:此處配置的演算法需要hbase也支援才可以! 如果儲存空間足夠,可以考慮配置為不壓縮,也會提升匯入速率!

2)配置項:storage.hbase.skip-schema-check

假設JanusGraph的HBase表和列族已經存在。 如果是這樣,JanusGraph將不會檢查其 table/ CF 的存在,也不會在任何情況下嘗試建立它們。

型別: 布林值

預設值: false,檢查

注意: 可以在資料匯入時,將該配置項設定為true,去除table/ CF的檢查,這個其實作用不大;因為都是在初始化圖例項的時候就去檢查了。。

3、第三方索引後端的優化選項

針對於第三方存索引的優化分為兩部分:

  • 第三方索引叢集自身的優化
  • JanusGraph結合第三方索引的優化選項

叢集自身的優化

叢集自身的優化,本文主要介紹janusgraph相關優化這裡就不多說這部分了,主要是提升索引叢集的讀寫能力;

這裡主要還是關注的索引的寫資料能力優化後的提升!這部分的優化至關重要!

例如es的執行緒池引數優化等

JanusGraph針對優化

針對於JanusGraph+第三方索引的優化,官網(配置項文件超連結) 給出了一些配置選項,可從其找出對應的配置項;

針對於es,我再配置項中找出了對應的一些可能有作用的配置如下:

1)配置項: index.[X].elasticsearch.retry_on_conflict

指定在發生衝突時應重試操作多少次。

型別: 整數

預設值: 0次

注意: 增大該值可以提升在批量匯入中,發生衝突後解決衝突的機率

3、JVM的優化

JanusGraph基於Java語言編寫,則毋庸置疑會用到JVM

對JVM的調優也主要集中到垃圾收集器和堆記憶體的調優

堆大小調整:

我們在匯入圖資料時會產生大量的臨時資料,這裡需要我們調整一個合適的堆空間;

推薦至少為8G

垃圾收集器調優:

如果在使用CMS發現GC過於頻繁的話,我們可以考慮將垃圾收集器設定為:G1

這個收集器適用於大堆空間的垃圾收集,有效的減少垃圾收集消耗的時間;

注意:

此處的JVM調優設計JanusGraph java api專案gremlin server部分的JVM調優;

二:基於資料層面的優化

2.1 拆分圖 併發執行

在某些情況下,圖資料可以分解為多個斷開連線的子圖。這些子圖可以跨多臺機器獨立地並行載入;不管是採用下述的那種方式載入都可以;

這裡有一個前提: 底層第三方儲存叢集的處理能力沒有達到最大; 如果底層儲存叢集當前的平均cpu已經是80 90%的了,就算拆分多個圖也沒用,底層儲存的處理能力已經給限制住當前的速度了;

這個方式官網上提了一句,這個地方其實很難可以將圖拆分為斷開的子圖,並且針對於拆分為多個子圖來說,主要還是依託於底層儲存叢集的處理能力;

一般情況下,不用拆分圖進行一個好的優化後,底層儲存叢集的處理能力都可以完全呼叫起來;

2.2 分步驟 併發執行

如果無法分解圖形,則分多個步驟載入通常是有益的,也就是將vertex 和 edge 分開匯入;

這種方式,需要資料同學做好充分的資料探查,不然可能會產生資料不一致的情況! 下面是步驟(其中最後兩個步驟可以在多臺計算機上並行執行):

  1. 前提: 確保vertex和edge資料集 刪除了重複資料 並且是一致的
  2. 環境配置: 設定batch-loading=true 並且優化上述介紹的其他選項
  3. vertex全量匯入: 將所有的vertex及節點對應的property新增到圖中。維護一份從頂點ID(由載入的資料使用者自定義)到JanusGraph的內部頂點分散式一致性ID(即vertex.getId())的對映,該ID 為64位長
  4. edge全量匯入: 使用對映新增所有的邊 來查詢JanusGraph的頂點id 並使用該id檢索頂點。

講述過程:

假設存在3個使用者,“-”號後為對應的自定義的頂點id值(注意,非匯入相簿中的頂點id,只是標識當前節點的業務id):

user1-1
user2-2
user3-3

上述第三步,我們將這些節點匯入到相簿中! 產生一個業務id 與 相簿中節點的分散式唯一id的對應關係如下:

我們在匯入一個點後,janus會返回一個vertex例項物件,通過這個物件就可以拿到對應的相簿vertexId

業務id-相簿中節點id
1-4261
2-4274
3-4351

注意:這一步驟,我們可以多執行緒並行匯入而無需擔心一致性問題,因為節點全部唯一

節點匯入完成!

假設存在對應的有3條邊如下,

edge1:user1 --> user2 
edge2:user1 --> user3
edge3:user2 --> user3

我們通過user1對應業務id:1,而業務id:1對應節點id:4261,我們就可以轉化為下述對應關係:

4261 --> 4274
4261 --> 4351
4274 --> 4351

在JanusGraph中通過節點id查詢節點,是獲取節點的最快方式!!

我們就可以通過id獲取相簿中對應的vertex物件例項,然後使用addVertex將edge匯入!

注意:這一步驟,我們可以多執行緒並行匯入而無需擔心一致性問題,因為edge全部唯一

第三個步驟和第四個步驟也可以並行執行,我們在匯入點的過程中,可以也同時將源節點和目標節點已經匯入到相簿中的edge同步入圖;

三:批量匯入方案

下述介紹一下4種匯入方案,其中包含3中批量匯入方案;

3.1 方案一:基於JanusGraph Api的資料匯入

該方案可以整合上述第二部分二:基於資料層面的優化

涉及方法:

public JanusGraphVertex addVertex(Object... keyValues);
public JanusGraphEdge addEdge(String label, Vertex vertex, Object... keyValues);

在janusGraph的業務專案中,可以開發一個數據匯入模組,使用提供的類似於java api等,進行資料的匯入;

流程:

這種是最簡單的方案,具體的細節,這裡就不給出了,節點匯入大體流程為下述:

  1. 獲取圖例項
  2. 獲取圖例項事務物件
  3. 插入節點
  4. 提交事務

邊匯入大體流程如下:

  1. 獲取圖例項
  2. 獲取圖例項事務物件
  3. 查詢源節點 + 目標節點(這個地方可能是效能瓶頸)
  4. 在兩個節點中插入邊
  5. 提交事務

主要作用:

此方案可以用於資料量較小的情況下使用,例如每天的增量匯入等;

優化點:

1、批量事務提交

此處的事務提交,我們可以通過一個常用的優化手段: 處理多個vertex 或者 edge後再提交事務!

可以減少janus與底層儲存的互動,減少網路消耗和連線數,提升匯入的效能!

處理的個數多少主要還是和底層儲存叢集相關,幾百還是幾千這就需要自己除錯獲取當前環境下的最優配置了

注意:

如果開啟了上述提到的storage.batch-loading,則需要你們現在的環境下注意一致性的問題;

例如相簿中原本存在一個a節點,你又插入一個a節點,便會有一致性問題;

我們可以通過插入資料前,先通過唯一索引查詢節點,節點存在則更新節點,不存在則插入節點;

3.2 方案二:基於Gremlin Server的批量匯入

該方案可以整合上述第二部分二:基於資料層面的優化

這裡需要我們搭建一個Gremlin server伺服器,通過在伺服器執行gremlin-server.sh即可,暴露出一個tcp介面;

則可以將對應的gremlin 語句提交到對應的gremlin伺服器執行;

具體的流程和第一個方案一致

優化點:

同上一個方案優化點1;

3、gremlin server池引數調整

除了上述給定的一些配置的優化項,還有兩個gremlin server的優化項需要調整

  • threadPoolWorke:最大2*core個數,用於處理非阻塞讀寫的Gremlin伺服器可用的執行緒數;

  • gremlinPool:用於在ScriptEngine中執行實際指令碼的“Gremlin”執行緒的數量。此池表示Gremlin伺服器中可用於處理阻塞操作的工作者;

和執行緒池調優一樣,要找出最合適的一個值,太小不好,太大也不好;

注意:

該方案本質上和第一個方案類似,只不過是一個是通過給定的java api提交插入請求,一個直接通過gremlin語句提交插入請求到gremlin server;

3.3 方案三:IBM的janusgraph-utils

這個方案沒用過,簡單看了一下,這個主要也是通過多執行緒對資料進行匯入;

自己手動組裝對應的schema檔案,將schema匯入到資料庫;

然後將組裝為特定格式的csv檔案中的資料,匯入到相簿中;

github地址: https://github.com/IBM/janusgraph-utils

優點:

1、使用難度不高,讓我們不用再去手寫多執行緒的匯入了;減少工作量

2、直連hbase和es,相對於前兩種減少了對應的gremlin server和janus server的網路互動

3、支援通過配置檔案自動建立Janusgraph schema和index

4、可配置化的執行緒池大小和每次批量提交的數量

問題:

1、schema和csv檔案也是要使用者組裝出對應格式

2、相對於前兩種方式效能提升有限,主要是少了一層網路互動。多執行緒和批量提交,前兩種都可以手動去實現;還需要引入一個新的元件

3、支援janus版本較低,可以手動升級,不難

4、相對於下面兩種方案,效能還是較低

3.4 方案四:bulk loader

官方提供的批量匯入方式;需要hadoop叢集和spark叢集的支援;

hadoop和spark叢集配置,可以看官網:https://docs.janusgraph.org/advanced-topics/hadoop/

該方案對匯入的資料有著嚴格的要求,支援多住資料格式:jsoncsvxmlkryo

資料要求: 節點、節點對應的屬性、節點對應的邊需要在一行中(一個json中、一個xml項中)

資料案例: 下面給一下官網的案例,在data目錄下:

-- json格式
{"id":2,"label":"song","inE":{"followedBy":[{"id":0,"outV":1,"properties":{"weight":1}},{"id":323,"outV":34,"properties":{"weight":1}}]},"outE":{"followedBy":[{"id":6190,"inV":123,"properties":{"weight":1}},{"id":6191,"inV":50,"properties":{"weight":1}}],"sungBy":[{"id":7666,"inV":525}],"writtenBy":[{"id":7665,"inV":525}]},"properties":{"name":[{"id":3,"value":"IM A MAN"}],"songType":[{"id":5,"value":"cover"}],"performances":[{"id":4,"value":1}]}}

-- xml格式
<node id="4"><data key="labelV">song</data><data key="name">BERTHA</data><data key="songType">original</data><data key="performances">394</data></node><node id="5"><data key="labelV">song</data><data key="name">GOING DOWN THE ROAD FEELING BAD</data><data key="songType">cover</data><data key="performances">293</data></node><node id="6"><data key="labelV">song</data><data key="name">MONA</data><data key="songType">cover</data><data key="performances">1</data></node><node id="7"><data key="labelV">song</data><data key="name">WHERE HAVE THE HEROES GONE</data><data key="songType"></data><data key="performances">0</data></node>

-- csv格式
2,song,IM A MAN,cover,1 followedBy,50,1|followedBy,123,1|sungBy,525|writtenBy,525       followedBy,1,1|followedBy,34,1

我們可以觀察到,這其實是不容易構造的,節點屬性邊全部需要整合到一塊;

資料整理方案: spark的cogroup, cogroup的作用就是將多個 RDD將相同的key jion成一行,從而使用csv格式進行匯入,操作實示例如下:

val rdd1 = sc.parallelize(Array(("aa",1),("bb",2),("cc",6)))
val rdd2 = sc.parallelize(Array(("aa",3),("dd",4),("aa",5)))
rdd1.cogroup(rdd2).collect()

output:
(aa,(CompactBuffer(1),CompactBuffer(3, 5)))
(dd,(CompactBuffer(),CompactBuffer(4)))
(bb,(CompactBuffer(2),CompactBuffer()))
(cc,(CompactBuffer(6),CompactBuffer()))

這裡大家可以參考360對這方面的處理,轉化程式碼github地址:https://github.com/360jinrong/janusgraph-data-importer

注意:

此處的原始資料的準備需要細緻,一致性保證完全依賴於原始資料的一致性保證;

3.5 方案五:基於抽取序列化邏輯的生成Hbase File離線批量匯入

博主在相簿初始化時採用了這種方式,前前後後花費了接近一個月的時間,經過細緻的驗證,現已應用到生產環境使用,下面介紹一下對應的注意點和主要流程:

方案: 依據原始碼抽取出對應的序列化邏輯,分散式生成Hfile,將Hfile匯入到Hbase;

問題: 人力成本過高,需要看原始碼抽邏輯,並且需要一個細緻的驗證;

方案難點:

JanusGraph對於Hbase的資料底層格式,可以看我寫的部落格:

這兩篇部落格,一個分析了底層儲存的格式,一個進行了相應的原始碼分析;

流程+驗證+建議: 請看我寫的另外一個部落格:《圖解JanusGraph系列-生成Hbase file離線批量匯入方案》

這種方式,其實消耗的人力會比較大;另外,對於抽取的邏輯是否開源,這個後續我們會考慮這個問題,開源後地址會同步更新到本文章;

四:幾種場景

4.1 相簿中已經存在資料

如果相簿總已經存在資料,對於3.4 方案四:bulk loader3.5 方案五:基於抽取序列化邏輯的生成Hbase File離線批量匯入 這兩種方案可能就無法使用了;

我們可以採取兩種方式:

  1. 使用第一種方案和第二種方案進行匯入(注意資料一致性)
  2. 整體遷移相簿,將相簿中現有資料和將要匯入的資料整體遷移到另外一個新相簿,就可以使用4、5方案進行匯入

4.2 圖資料初始化或者遷移

資料量小,建議使用3.1 方案一:基於JanusGraph Api的資料匯入3.2 方案二:基於Gremlin Server的批量匯入3.3 方案三:IBM的janusgraph-utils

資料量大,建議使用3.4 方案四:bulk loader3.5 方案五:基於抽取序列化邏輯的生成Hbase File離線批量匯入

4.3 單純只看業務資料量

選擇什麼方式匯入,單純基於業務資料量給一些個人建議:

  • 小資料量(億級以下): 直接janusgraph api 或者 gremlin server匯入即可,幾小時就ok了; 如果想要更快可以使用另外的方式,只是會增加人力成本;
  • 中等資料量(十億級以下):資料充分探查,開啟storage.batch-loading完全可以支援,使用api,2天左右可以完成全量的資料匯入
  • 大資料量(百億級資料):推薦採用bulk load方式,配置hadoop叢集,使用spark cluster匯入
  • 另一個方案:如果上述還是無法滿足你們的需求,可以採用依據原始碼抽取序列化邏輯生成Hfile,然後離線匯入到Hbase的方案,不過這種是花費人力成本最大的一種方式,不過效果也幾乎是最好的,尤其是資料量越大效果越明顯

總結

資料的批量匯入一直是JanusGraph讓人難受的地方,經過本文的介紹大家應該有一個大體的認識,針對於百億級的資料匯入,上述的幾種方案是可以支援的;

其他:批量匯入後,每天的增量採用訊息中介軟體接入JanusGraph api匯入即可;

資料匯入過程中,針對於不同的底層儲存、不同的版本還是會有一些問題,具體的匯入的坑大家可以加我v,邀你加群

注意!!!以上僅作為參考,有任何問題可評論或加博主v討論

參考:
JanusGaph官網
https://www.jianshu.com/p/f372f0ef6c42
https://www.jianshu.com/p/4b59c00a15de/