關於Neo4j和Cypher批量更新和批量插入優化的5個建議
原文連結: http://jexp.de/blog/2017/03/5-tips-tricks-for-fast-batched-updates-of-graph-structures-with-neo4j-and-cypher
注:我在測試後,對原文中的部分Cypher語句進行修改,使得其符合語法規則
當通過程式向圖形化資料庫中寫入大量資料的時候,你會希望它能夠高效的處理。
低效的方式
下面這些方式不是十分有效:
- 將值直接寫入到語句中,而不是通過引數的方式
- 每一個更新都通過一個Transaction傳送一個請求
- 通過一個Transaction傳送大量的單個請求
- 生成一個巨大複雜的語句(幾百行),然後通過一個Transaction進行提交
- 在一個Transaction中,傳送一個巨大的請求,會導致OOM錯誤
正確的方式
你需要構造儘可能小的請求,並且語句格式固定(這樣可以利用快取),然後通過引數方式進行使用。
每一個請求可以只修改一個屬性,或者修改整個子圖(上百個節點),但是它的語句結構必須是一致的,否則就不能使用快取。
UNWIND – 救星
為了實現這個目標,你只需要在你單次請求的前面加上一個UNWIND語句。UNWIND會將大量的資料(高達10k或者50k條)分散成一行一行的,每一行都會包含每一次更新所需要的全部資訊。
你新增一個{batch}引數,並且將它的值設定成一個Map列表,其中可以包含你的資料(10k或者50k條)。這些資料會被打包成一個完整的請求,並且符合語法結構,還用上了快取(因為其結構一致)。
語法結構
輸入:
{batch: [{row1},{row2},{row3},...10k]}
語句:
UNWIND {batch} as row
// 基於每一行的Map資料,編寫更新語句
示例
下面是一些示例
建立節點並寫入屬性
資料:
{batch: [{name:"Alice",age:32},{name:"Bob",age:42}]}
語句:
UNWIND {batch} as row
CREATE (n:Label)
SET n.name = row.name, n.age = row.age
Merge節點並寫入屬性
資料:
{batch: [{id:" [email protected]",properties:{name:"Alice",age:32}},{id:"[email protected]",properties:{name:"Bob",age:42}}]}
語句:
UNWIND {batch} as row
MERGE (n:Label {id:row.id})
(ON CREATE) SET n.name = row.properties.name, n.age = row.properties.age
尋找節點,建立/Merge關係,並寫入屬性
資料:
{batch: [{from:"[email protected]",to:"[email protected]",properties:{since:2012}},{from:"[email protected]",to:"[email protected]",properties:{since:2016}}]}
語句:
UNWIND {batch} as row
MATCH (from:Label {from:row.from})
MATCH (to:Label {to:row.to})
CREATE/MERGE (from)-[rel:KNOWS]->(to)
(ON CREATE) SET rel.since = row.properties.since
通過id或者id列表找節點
對於多叉樹很好用
在這裡我們只傳入了一個單獨的屬性created
。實際上你可以不傳入任何屬性,或者傳入一個map的屬性來進行更新。
資料:
{batch: [{from:123,to:[44,12,128],created:"2016-01-13"}, {from:34,to:[23,35,2983],created:"2016-01-15"},...]}
語句:
UNWIND {batch} as row
MATCH (from) WHERE id(from) = row.from
MATCH (to) WHERE id(from) IN row.to // list of ids
CREATE/MERGE (from)-[rel:FOO]->(to)
SET rel.created = row.created
更快更高效
下面是一些更多的技巧。
你可以傳入一個Map,其中的key是節點id或者關係id。這樣以來,通過id查詢會變得更高效。
通過id更新已有的節點
資料:
{ batch : [{"1":334,"2":222,3:3840, ... 100k}]}
語句:
WITH {batch} as data, [k in keys({batch}) | toInt(k)] as ids
MATCH (n) WHERE id(n) IN ids
// 單個屬性更新
SET n.count = data[toString(id(n))]
通過id更新已有的關係
資料:
{ batch : [{"1":334,"2":222,3:3840, ... 100k}]}
- 1
語句:
WITH {batch} as data, [k in keys({batch}) | toInt(k)] as ids
MATCH ()-[rel]->() WHERE id(rel) IN ids
SET rel.foo = data[toString(id(rel))]
有條件的建立資料
有些時候,你希望根據輸入動態的建立資料。但是Cypher目前沒有諸如WHEN
或者IF
的條件語句,CASE WHEN
也只是一個表示式,因此,你必須使用一個我多年前想出來的技巧。
Cypher提供FOREACH
語句,用來遍歷列表中的每一個元素並分別執行更新操作。於是,一個包含0個元素或者1個元素的列表則可以看成一個條件表示式。因為當0個元素的時候,就不會執行遍歷,而當1個元素的時候,就只執行一次遍歷。
大致思路如下:
...
FOREACH (_ IN CASE WHEN predicate THEN [true] ELSE [] END |
... update operations ....
)
其中,列表中的true
值可以是其他任何值,42,"",null
等等。只要它是一個值,那麼我們就可以得到一個非空的列表。
相似的,你也可以使用RANGE(1, CASE WHEN predicate THEN 1 ELSE 0 END)
。當predicate的值為false的時候,就會範圍一個空列表。或者,如果你喜歡使用filter
,那麼也可以通過filter(_ IN [1] WHERE predicate)
來構造。
下面是一個完整的示例:
LOAD CSV FROM {url} AS row
MATCH (o:Organization {name:row.org})
FOREACH (_ IN case when row.type = 'Person' then [1] else [] end|
MERGE (p:Person {name:row.name})
CREATE (p)-[:WORKS_FOR]->(o)
)
FOREACH (_ IN case when row.type = 'Agency' then [1] else [] end|
MERGE (a:Agency {name:row.name})
CREATE (a)-[:WORKS_FOR]->(o)
)
需要注意的是,在FOREACH
內部建立的變數無法在外部訪問。你需要再重新查詢一次,或者你需要再FOREACH
內完成全部更新操作。
使用APOC庫
APOC庫提供了很多有用的方法供你使用。在這裡,我推薦下面3個方法:
- 建立節點和關係,並且可以動態設定標籤和屬性
- 批量提交和更新
- 動態建立或者操作Map,並賦給屬性
動態建立節點和關係
通過apoc.create.node
和apoc.create.relationship
你可以動態的計算節點標籤,關係型別和任意的屬性。
- 標籤是一個String陣列
- 屬性就是一個Map
UWNIND {batch} as row
CALL apoc.create.node(row.labels, row.properties) yield node
RETURN count(*)
在apoc.create.*
方法中,也提供了設定/更新/刪除屬性和標籤的功能。
UWNIND {batch} as row
MATCH (from) WHERE id(n) = row.from
MATCH (to:Label) where to.key = row.to
CALL apoc.create.relationship(from, row.type, row.properties, to) yield rel
RETURN count(*)
批量提交
在一開始j就提到了,大量的提交Transaction是有問題的。你可以用2G-4G的heap來更新百萬條記錄,但當量級更大了之後就會很困難了。在使用32G的heap下,我最大的Transaction可以達到10M的節點。
這時,apoc.periodic.iterate
可以提供很大的幫助。
它的原理很簡單:你有兩個Cypher語句,第一條語句能夠提供可操縱的資料併產生巨大的資料流,第二條語句執行真正的更新操作,它對每一個數據都進行一次更新操作,但是它只在處理一定數量的資料後才建立一個新的Transaction。
打個比方,假如你第一條語句返回了五百萬個需要更新的節點,如果使用內部語句的話,那麼每一個節點都會進行一次更新操作。但是如果你設定批處理大小為10k的話,那麼每一個Transaction會批量更新10k的節點。
如果你的更新操作是相互獨立的話(建立節點,更新屬性或者更新獨立的子圖),那麼你可以新增parallel:true
來充分利用cpu。
比方說,你想計算多個物品的評分,並通過批處理的方式來更新屬性,你應該按下面這樣操作
call apoc.periodic.iterate('
MATCH (n:User)-[r1:LIKES]->(thing)<-[r2:RATED]-(m:User) WHERE id(n)<id(m) RETURN thing, avg( r1.rating + r2.rating ) as score
','
WITH {thing} as t SET t.score = {score}
', {batchSize:10000, parallel:true})
動態建立/更新Map
儘管Cypher為列表提供了相當遍歷的操作,如range, collect, unwind, reduce, extract, filter, size
等,但Map在有的時候也是需要進行建立和更改的。
apoc.map.*
提供了一系列的方法來簡化這個過程。
通過其他資料建立Map:
RETURN apoc.map.fromPairs([["alice",38],["bob",42],...])
// {alice:38, bob: 42, ...}
RETURN apoc.map.fromLists(["alice","bob",...],[38,42])
// {alice:38, bob: 42, ...}
// groups nodes, relationships, maps by key, good for quick lookups by that key
RETURN apoc.map.groupBy([{name:"alice",gender:"female"},{name:"bob",gender:"male"}],"gender")
// {female:{name:"alice",gender:"female"}, male:{name:"bob",gender:"male"}}
RETURN apoc.map.groupByMulti([{name:"alice",gender:"female"},{name:"bob",gender:"male"},{name:"Jane",gender:"female"}],"gender")
// {female:[{name:"alice",gender:"female"},{name:"jane",gender:"female"}], male:[{name:"bob",gender:"male"}]}
更新Map:
RETURN apoc.map.merge({alice: 38},{bob:42})
// {alice:38, bob: 42}
RETURN apoc.map.setKey({alice:38},"bob",42)
// {alice:38, bob: 42}
RETURN apoc.map.removeKey({alice:38, bob: 42},"alice")
// {bob: 42}
RETURN apoc.map.removeKey({alice:38, bob: 42},["alice","bob","charlie"])
// {}
// remove the given keys and values, good for data from load-csv/json/jdbc/xml
RETURN apoc.map.clean({name: "Alice", ssn:2324434, age:"n/a", location:""},["ssn"],["n/a",""])
// {name:"Alice"}
結論
通過上面這些方式,我能夠快速的執行更新操作。當然,你也可以組合這些方法,來實現更復雜的操作。