Clickhouse 分散式表&本地表
CK 分散式表和本地表
ck的表分為兩種:
-
分散式表
一個邏輯上的表, 可以理解為資料庫中的檢視, 一般查詢都查詢分散式表. 分散式表引擎會將我們的查詢請求路由本地表進行查詢, 然後進行彙總最終返回給使用者.
-
本地表:
實際儲存資料的表
1. 不寫分散式表的原因
- 分散式表接收到資料後會將資料拆分成多個parts, 並轉發資料到其它伺服器, 會引起伺服器間網路流量增加、伺服器merge的工作量增加, 導致寫入速度變慢, 並且增加了Too many parts的可能性.
- 資料的一致性問題, 先在分散式表所在的機器進行落盤, 然後非同步的傳送到本地表所在機器進行儲存,中間沒有一致性的校驗, 而且在分散式表所在機器時如果機器出現down機, 會存在資料丟失風險.
- 資料寫入預設是非同步的,短時間內可能造成不一致.
- 對zookeeper的壓力比較大(待驗證). 沒經過正式測試, 只是看到了有人提出.
2. Replication & Sharding
ClickHouse依靠ReplicatedMergeTree引擎族與ZooKeeper實現了複製表機制, 成為其高可用的基礎.
ClickHouse像ElasticSearch一樣具有資料分片(shard)的概念, 這也是分散式儲存的特點之一, 即通過並行讀寫提高效率. ClickHouse依靠Distributed引擎實現了分散式表機制, 在所有分片(本地表)上建立檢視進行分散式查詢.
3. Replicated Table & ReplicatedMergeTree Engines
不同於HDFS的副本機制(基於叢集實現), Clickhouse的副本機制是基於表實現的. 使用者在建立每張表的時候, 可以決定該表是否高可用.
Local_table
CREATE TABLE IF NOT EXISTS {local_table} ({columns}) ENGINE = ReplicatedMergeTree('/clickhouse/tables/#_tenant_id_#/#__appname__#/#_at_date_#/{shard}/hits', '{replica}') partition by toString(_at_date_) sample by intHash64(toInt64(toDateTime(_at_timestamp_))) order by (_at_date_, _at_timestamp_, intHash64(toInt64(toDateTime(_at_timestamp_))))
支援複製表的引擎都是ReplicatedMergeTree引擎族, 具體可以檢視官網:
ReplicatedMergeTree引擎族接收兩個引數:
- ZK中該表相關資料的儲存路徑, ClickHouse官方建議規範化, 例如:
/clickhouse/tables/{shard}/[database_name]/[table_name]
. - 副本名稱, 一般用
{replica}
即可.
ReplicatedMergeTree引擎族非常依賴於zookeeper, 它在zookeeper中儲存了大量的資料:
表結構資訊、元資料、操作日誌、副本狀態、資料塊校驗值、資料part merge過程中的選主資訊...
同時, zookeeper又在複製表急之下扮演了三種角色:
元資料儲存、日誌框架、分散式協調服務
可以說當使用了ReplicatedMergeTree
時, zookeeper壓力特別重, 一定要保證zookeeper叢集的高可用和資源.
3.1. 資料同步的流程
- 寫入到一個節點
- 通過
interserver HTTP port
埠同步到其他例項上 - 更新zookeeper叢集記錄的資訊
3.2. 重度依賴Zookeeper導致的問題
ck的replicatedMergeTree
引擎方案有太多的資訊儲存在zk上, 當資料量增大, ck節點數增多, 會導致服務非常不穩定, 目前我們的ck叢集規模還小, 這個問題還不嚴重, 但依舊會出現很多和zk有關的問題(詳見遇到的問題).
實際上 ClickHouse 把 ZK 當成了三種服務的結合, 而不僅把它當作一個 Coordinate service(協調服務), 可能這也是大家使用 ZK 的常用用法。ClickHouse 還會把它當作 Log Service(日誌服務),很多行為日誌等數字的資訊也會存在 ZK 上;還會作為表的 catalog service(元資料儲存),像表的一些 schema 資訊也會在 ZK 上做校驗,這就會導致 ZK 上接入的數量與資料總量會成線性關係。
目前針對這個問題, clickhouse社群提出了一個mini checksum
方案, 但是這並沒有徹底解決 znode 與資料量成線性關係的問題. 目前看到比較好的方案是位元組的:
我們就基於 MergeTree 儲存引擎開發了一套自己的高可用方案。我們的想法很簡單,就是把更多 ZK 上的資訊解除安裝下來,ZK 只作為 coordinate Service。只讓它做三件簡單的事情:行為日誌的 Sequence Number 分配、Block ID 的分配和資料的元資訊,這樣就能保證資料和行為在全域性內是唯一的。
關於節點,它維護自身的資料資訊和行為日誌資訊,Log 和資料的資訊在一個 shard 內部的副本之間,通過 Gossip 協議進行互動。我們保留了原生的 multi-master 寫入特性,這樣多個副本都是可以寫的,好處就是能夠簡化資料匯入。圖 6 是一個簡單的框架圖。
以這個圖為例,如果往 Replica 1 上寫,它會從 ZK 上獲得一個 ID,就是 Log ID,然後把這些行為和 Log Push 到叢集內部 shard 內部活著的副本上去,然後當其他副本收到這些資訊之後,它會主動去 Pull 資料,實現資料的最終一致性。我們現在所有叢集加起來 znode 數不超過三百萬,服務的高可用基本上得到了保障,壓力也不會隨著資料增加而增加。
4. Distributed Table & Distributed Engine
ClickHouse分散式表的本質並不是一張表, 而是一些本地物理表(分片)的分散式檢視,本身並不儲存資料. 分散式表建表的引擎為Distributed
.
Distrbuted_table
CREATE TABLE IF NOT EXISTS {distributed_table} as {local_table}
ENGINE = Distributed({cluster}, '{local_database}', '{local_table}', rand())
Distributed引擎需要以下幾個引數:
- 叢集識別符號
- 本地表所在的資料庫名稱
- 本地表名稱
- 分片鍵(sharding key) - 可選
該鍵與config.xml中配置的分片權重(weight)一同決定寫入分散式表時的路由, 即資料最終落到哪個物理表上. 它可以是表中一列的原始資料(如site_id
), 也可以是函式呼叫的結果, 如上面的SQL語句採用了隨機值rand()
. 注意該鍵要儘量保證資料均勻分佈, 另外一個常用的操作是採用區分度較高的列的雜湊值, 如intHash64(user_id)
.
4.1. 資料查詢的流程
- 各個例項之間會交換自己持有的分片的表資料
- 彙總到同一個例項上返回給使用者