1. 程式人生 > 其它 >七 ClickHouse分散式

七 ClickHouse分散式

1 在叢集的每個節點上安裝ck服務
2 <listen_host>::<listen_host>
3 配置zookeeper 正常啟動 

叢集是副本和分片的基礎,它將ClickHouse的服務拓撲由單節點延 伸到多個節點,但它並不像Hadoop生態的某些系統那樣,要求所有節點組成一個單一的大叢集。ClickHouse的叢集配置非常靈活,使用者既可以將所有節點組成一個單一叢集,也可以按照業務的訴求,把節點劃分為多個小的叢集。在每個小的叢集區域之間,它們的節點、分割槽和副本數量可以各不相同

另一種是從功能作用層面區分,使用副本的主要目的是防止資料丟失,增加資料儲存的冗餘;而使用分片的主要目的是實現資料的水平切分,

經講過MergerTree的命名規則。如果在*MergeTree的前面增加Replicated的字首,則能夠組合 成一個新的變種引擎,即Replicated-MergeTree複製表!

 

只有使用了ReplicatedMergeTree複製表系列引擎,才能應用副本的能力(後面會介紹另一種副本的實現方式)。或者用一種更為直接的方式理解,即使用ReplicatedMergeTree的資料表就是副本。 ReplicatedMergeTree是MergeTree的派生引擎,它在MergeTree的 基礎上加入了分散式協同的能力,

 

在MergeTree中,一個數據分割槽由開始建立到全部完成,會歷經兩類儲存區域。

(1)記憶體:資料首先會被寫入記憶體緩衝區。

(2)本地磁碟:資料接著會被寫入tmp臨時目錄分割槽,待全部完成後再將臨時目錄重新命名為正式分割槽。

ReplicatedMergeTree在上述基礎之上增加了ZooKeeper的部分,它會進一步在ZooKeeper內建立一系列的監聽節點,並以此實現多個例項之間的通訊。在整個通訊過程中,ZooKeeper並不會涉及表資料的傳輸。

  • 依賴ZooKeeper:在執行INSERT和ALTER查詢的時候,ReplicatedMergeTree需要藉助ZooKeeper的分散式協同能力,以實現多個副本之間的同步。但是在查詢副本的時候,並不需要使用 ZooKeeper。關於這方面的更多資訊,會在稍後詳細介紹。

  • 表級別的副本:副本是在表級別定義的,所以每張表的副本配置都可以按照它的實際需求進行個性化定義,包括副本的數量,以及副本在叢集內的分佈位置等。

  • 多主架構(Multi Master):可以在任意一個副本上執行INSERT和ALTER查詢,它們的效果是相同的。這些操作會藉助ZooKeeper的協同能力被分發至每個副本以本地形式執行。

  • Block資料塊:在執行INSERT命令寫入資料時,會依據 max_insert_block_size的大小(預設1048576行)將資料切分成若干個Block資料塊。所以Block資料塊是資料寫入的基本單元,並且具有 寫入的原子性和唯一性。

  • 原子性:在資料寫入時,一個Block塊內的資料要麼全部寫入成功,要麼全部失敗。

  • 唯一性:在寫入一個Block資料塊的時候,會按照當前Block資料塊的資料順序、資料行和資料大小等指標,計算Hash資訊摘要並記錄在案。在此之後,如果某個待寫入的Block資料塊與先前已被寫入的 Block資料塊擁有相同的Hash摘要(Block資料塊內資料順序、資料大小和資料行均相同),則該Block資料塊會被忽略。這項設計可以預防由異常原因引起的Block資料塊重複寫入的問題。

7.0 分片概念

通過引入資料副本,雖然能夠有效降低資料的丟失風險(多份儲存),並提升查詢的效能(分攤查詢、讀寫分離),但是仍然有一個問題沒有解決,那就是資料表的容量問題。到目前為止,每個副本自

身,仍然儲存了資料表的全量資料。所以在業務量十分龐大的場景中,依靠副本並不能解決單表的效能瓶頸。想要從根本上解決這類問題,需要藉助另外一種手段,即進一步將資料水平切分,也就是我們將要介紹的資料分片。ClickHouse中的每個服務節點都可稱為一個shard(分片)。從理論上來講,假設有N(N>=1)張資料表A,分佈在N個ClickHouse服務節點,而這些資料表彼此之間沒有重複資料,那麼就可以說資料表A擁有N個分片。然而在工程實踐中,如果只有這些分片表,那麼整個 Sharding(分片)方案基本是不可用的。對於一個完整的方案來說,還需要考慮資料在寫入時,如何被均勻地寫至各個shard,以及資料在查詢時,如何路由到每個shard,並組合成結果集。所以,ClickHouse

的資料分片需要結合Distributed表引擎一同使用

Distributed表引擎自身不儲存任何資料,它能夠作為分散式表的一層透明代理,在叢集內部自動開展資料的寫入、分發、查詢、路由等工作

7.1 配置zookeeper

需要在每臺CK的節點上配置ZK的位置

ClickHouse使用一組zookeeper標籤定義相關配置,預設情況下,在全域性配置config.xml中定義即可。但是各個副本所使用的Zookeeper 配置通常是相同的,為了便於在多個節點之間複製配置檔案,更常見的做法是將這一部分配置抽離出來,獨立使用一個檔案儲存。

首先,在伺服器的/etc/clickhouse-server/config.d目錄下建立一個名為metrika.xml的配置檔案:

<?xml version="1.0"?>
<yandex>
 <zookeeper-servers> 
 <node index="1"> 
 <host>doit01</host>
 <port>2181</port>
 </node>
  <node index="2"> 
 <host>doit02</host>
 <port>2181</port>
 </node>
  <node index="3"> 
 <host>doit03</host>
 <port>2181</port>
 </node>
 </zookeeper-servers>
</yandex>

  

接著,在全域性配置config.xml中使用<include_from>標籤匯入剛才定義的配置

9 <include_from>/etc/clickhouse-server/config.d/metrika.xml</include_from>

引用ZK的地址

 402  <zookeeper incl="zookeeper-servers" optional="false" />

incl與metrika.xml配置檔案內的節點名稱要彼此對應。至此,整個配置過程就完成了。

ClickHouse在它的系統表中,頗為貼心地提供了一張名為zookeeper的代理表。通過這張表,可以使用SQL查詢的方式讀取遠端ZooKeeper內的資料。有一點需要注意,在用於查詢的SQL語句中,必須指定path條件,

將配置檔案同步到其他叢集節點!!

scp -r config.d/  linux02:$PWD
scp -r config.d/  linux03:$PWD
​
scp config.xml   linux02:$PWD 
scp config.xml   linux03:$PWD  

7.2 建立副本表

在建立副本表以前, 首先要啟動叢集中的zookeeper

首先,由於增加了資料的冗餘儲存,所以降低了資料丟失的風險;其次,由於副本採用了多主

架構,所以每個副本例項都可以作為資料讀、寫的入口,這無疑分攤了節點的負載。

在使用單使用副本功能的時候 , 我們對CK叢集不需要任何的配置就可以實現資料的多副本儲存!只需要在建表的時候指定engine和ZK的位置即可 ;

ENGINE = ReplicatedMergeTree('zk_path', 'replica_name') 
​
-- /clickhouse/tables/{shard}/table_name
​
-- /clickhouse/tables/ 是約定俗成的路徑固定字首,表示存放資料表的根路徑。 

  

·{shard}表示分片編號,通常用數值替代,例如01、02、03。一張資料表可以有多個分片,而每個分片都擁有自己的副本。

·table_name表示資料表的名稱,為了方便維護,通常與物理表的名字相同(雖然ClickHouse並不強制要求路徑中的表名稱和物理表名相同);而replica_name的作用是定義在ZooKeeper中建立的副本名稱,該名稱是區分不同副本例項的唯一標識。一種約定成俗的命名方式是使用所在伺服器的域名稱。

對於zk_path而言,同一張資料表的同一個分片的不同副本,應該定義相同的路徑;而對於replica_name而言,同一張資料表的同一個分片的不同副本,應該定義不同的名稱

1) 一個分片 , 多個副本表

-- lixnu01 機器
create table tb_demo1 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/01/tb_demo1', 'linux01') 
order by id ;
-- lixnu02 機器
create table tb_demo1 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/01/tb_demo1', 'linux02') 
order by id ;
-- lixnu03 機器
create table tb_demo1 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/01/tb_demo1', 'linux03') 
order by id ;

  

檢視zookeeper中的內容

[zk: localhost:2181(CONNECTED) 0] ls /
[a, zookeeper, clickhouse, DNS, datanode1, server1, hbase]
[zk: localhost:2181(CONNECTED) 1] ls /clickhouse
[tables, task_queue]
[zk: localhost:2181(CONNECTED) 2] ls /clickhouse/tables
[01]
[zk: localhost:2181(CONNECTED) 3] ls /clickhouse/tables/01
[tb_demo1]
[zk: localhost:2181(CONNECTED) 4] ls /clickhouse/tables/01/tb_demo1
[metadata, temp, mutations, log, leader_election, columns, blocks, nonincrement_block_numbers, replicas, quorum, block_numbers]
[zk: localhost:2181(CONNECTED) 5] ls /clickhouse/tables/01/tb_demo1/replicas
[linux02, linux03, linux01]

  

SELECT *
FROM system.zookeeper
WHERE path = '/' ;
在任何一臺節點上,插入資料, 在其他節點上都能同步資料

2) 兩個分片 , 一個分片有副本一個分片沒有副本

-- lixnu01 機器
create table tb_demo2 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/01/tb_demo2', 'linux01') 
order by id ;
-- lixnu02 機器
create table tb_demo2 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/01/tb_demo2', 'linux02') 
order by id ;
-- lixnu03 機器
create table tb_demo2 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/01/tb_demo2', 'linux03') 
order by id ;
​
-------------------
​
-- lixnu01 機器
create table tb_demo2 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/01/tb_demo2', 'linux01') 
order by id ;
-- lixnu02 機器
create table tb_demo2 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/01/tb_demo2', 'linux02') 
order by id ;
-- lixnu03 機器
create table tb_demo2 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/02/tb_demo2', 'linux03') 
order by id ;
​
-- lixnu04 機器
create table tb_demo2 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/02/tb_demo2', 'linux03') 
order by id ;

7.3 分散式引擎

Distributed表引擎是分散式表的代名詞,它自身不儲存任何資料,而是作為資料分片的透明代理,能夠自動路由資料至叢集中的各個節點,所以Distributed表引擎需要和其他資料表引擎一起協同工作,

一般使用分散式表的目的有兩種,

  • 一種是表儲存多個副本並且有大量的併發操作,我們可以使用分散式表來分攤請求壓力解決併發問題

  • 一種是表特別大有多個切片組成 ,並且每切片資料也可以儲存資料副本

  • 本地表:通常以_local為字尾進行命名。本地表是承接資料的載體,可以使用非Distributed的任意表引擎,一張本地表對應了一個數據分片

  • 分散式表:通常以_all為字尾進行命名。分散式表只能使用Distributed表引擎,它與本地表形成一對多的對映關係,日後將通過分散式表代理操作多張本地表。

ENGINE = Distributed(cluster, database, table [,sharding_key]) 

 

  • cluster:叢集名稱,與叢集配置中的自定義名稱相對應。在對分散式表執行寫入和查詢的過程中,它會使用叢集的配置資訊來找到相應的host節點。

  • database和table:分別對應資料庫和表的名稱,分散式表使用這組配置對映到本地表。

  • sharding_key:分片鍵,選填引數。在資料寫入的過程中,分散式表會依據分片鍵的規則,將資料分佈到各個host節點的本地表。

 

7.3.1 沒有副本

本示例是,使用某個叢集 , 建立多分片無副本的表配置了一個叢集 cluster1 叢集中有三臺機器ck1 ck2 ck3,沒有副本,如果在這個叢集上建表, 表資料會有三個切片 ,沒有儲存資料副本

<clickhouse_remote_servers>
<cluster1>
<!-- 叢集名為cluster1 整個叢集中每個表有三個分片,分別在lx01 lx02 lx03上 -->
 <shard>
 <replica>
 <host>linux01</host>
 <port>9000</port>
 </replica>
 </shard>
 <shard>
 <replica>
 <host>linux02</host>
 <port>9000</port>
 </replica>
 </shard>
 <shard>
 <replica>
 <host>linux03</host>
 <port>9000</port>
 </replica>
 </shard>
</cluster1>
 <cluster2>
<!-- 叢集名為cluster2 一個切片 三個副本 -->
 <shard>  
 <replica>
 <host>linux01</host>
 <port>9000</port>
 </replica>
 <replica>
 <host>linux02</host>
 <port>9000</port>
 </replica>
 <replica>
 <host>linux03</host>
 <port>9000</port>
 </replica>
 </shard>
</cluster2>
    <!--叢集三  多個分片  保留副本 注意一個主機只使用一次 -->
<cluster3>
 <shard>  
 <replica>
 <host>doit01</host>
 <port>9000</port>
 </replica>
 <replica>
 <host>doit02</host>
 <port>9000</port>
 </replica>
 </shard>
  <shard>  
 <replica>
 <host>doit03</host>
 <port>9000</port>
 </replica>
 <replica>
 <host>doit04</host>
 <port>9000</port>
 </replica>
 </shard>
</cluster3>
    
</clickhouse_remote_servers>

  

同步配置檔案 到叢集中
-- 建立本地表 
create table tb_demo3 on cluster cluster1(
id  Int8 ,
name String 
)engine=MergeTree() 
order by  id ;
-- 建立分散式表 
create table demo3_all on cluster cluster1 engine=Distributed('cluster1','default','tb_demo3',id) as tb_demo3 ;
--向分散式表中插入資料 ,資料會根據插入規則將資料插入到不同的分片中

  

7.3.2 有副本的配置

<!-- 配置叢集2 , 叢集中的表有兩個分片 ,其中分片1 有兩個副本 -->
<cluster2>
 <shard>
    <replica>
        <host>linux01</host>
        <port>9000</port>
    </replica>
    <replica>
        <host>linux02</host>
        <port>9000</port>
    </replica>
 </shard>
 <shard>
    <replica>
        <host>linux03</host>
        <port>9000</port>
    </replica>
 </shard>
</cluster2>

  

-- 建立本地表
create table tb_demo4 on cluster cluster2(
id  Int8 ,
name String 
)engine=MergeTree() 
order by  id ;
-- 建立分散式表
create table demo4_all on cluster cluster2 engine=Distributed('cluster2','default','tb_demo4',id) as tb_demo4 ;

第一 : 1) 資料1分片的時候 , 多個副本 可以不使用分散式表 第二: 2)有多個分片的時候使用分散式表 給分片分配資料

多個分片的表
多個副本表 
多分片 多副本
  一個節點在一個叢集中只能使用一次

  

7.4 分散式DDL

ClickHouse支援叢集模式,一個叢集擁有1到多個節點。CREATE、ALTER、DROP、RENMAE及TRUNCATE這些DDL語句,都支援分散式執行。這意味著,如果在叢集中任意一個節點上執行DDL語句,那麼叢集中的 每個節點都會以相同的順序執行相同的語句。這項特性意義非凡,它就如同批處理命令一樣,省去了需要依次去單個節點執行DDL的煩惱。將一條普通的DDL語句轉換成分散式執行十分簡單,只需加上ON CLUSTER cluster_name宣告即可。例如,執行下面的語句後將會對 ch_cluster叢集內的所有節點廣播這條DDL語句:

-- 建表 on cluster cluster1
create table tb_demo3 on cluster cluster1(
id  Int8 ,
name String 
)engine=MergeTree() 
order by  id ;
-- 刪除叢集中所有的本地表或者是分散式表
drop table if exists tb_demo3 on cluster cluster1;
-- 修改叢集中的表結構 
alter table t3 on cluster cluster1 add column age Int8 ;
-- 刪除欄位 
-- 刪除分割槽 

  

7.5 分散式協同原理

副本協同的核心流程主要有INSERT、MERGE、MUTATION和ALTER四種,分別對應了資料寫入、分割槽合併、資料修改和元資料修改。INSERT和ALTER查詢是分散式執行的。藉助 ZooKeeper的事件通知機制,多個副本之間會自動進行有效協同,但是它們不會使用ZooKeeper儲存任何分割槽資料。而其他查詢並不支援分散式執行,包括SELECT、CREATE、DROP、RENAME和ATTACH。例如,為了建立多個副本,我們需要分別登入每個ClickHouse節點。接下來,會依次介紹上述流程的工作機理。為了便於理解,我先來整體認識一下各個流程的介紹方法。

7.5.1 insert原理

7.5.2 Merge原理

無論MERGE操作從哪個副本發起,其合併計劃都會交由主副本來制定,和insert一樣

7.5.3 mutation原理

alter table x update name=zss where

alter table x delete where

當對ReplicatedMergeTree執行ALTER DELETE或者ALTER UPDATE操作的時候,即會進入MUTATION部分的邏輯,它的核心流程如圖

7.5.4 alter原理

當對ReplicatedMergeTree執行ALTER操作進行元資料修改的時候,即會進入ALTER部

分的邏輯,例如增加、刪除表字段等。