1. 程式人生 > 其它 >clickhouse的分散式Distributed表引擎

clickhouse的分散式Distributed表引擎

  具有分散式引擎的表不儲存自己的任何資料,但允許在多個伺服器上進行分散式查詢處理。讀取是自動並行的。在讀取期間,將使用遠端伺服器上的表索引(如果有的話)。

一、建立表 

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Distributed(cluster, database
, table[, sharding_key[, policy_name]]) [SETTINGS name=value, ...]

 

  1.來源表

  當Distributed表指向當前伺服器上的表時,可以採用該表的模式:

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2 ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]]) [SETTINGS name=value, ...
]
  分散式引數
  • cluster- 伺服器配置檔案中的叢集名稱

  • database- 遠端資料庫的名稱

  • table- 遠端表的名稱

  • sharding_key- (可選)分片鍵

  • policy_name-(可選)策略名稱,它將用於儲存非同步傳送的臨時檔案

  分散式設定

  • fsync_after_insert-fsync非同步插入分散式後的檔案資料。保證作業系統將整個插入的資料重新整理到啟動器節點磁碟上的檔案中。

  • fsync_directories-fsync為目錄做。保證作業系統在分散式表上與非同步插入相關的操作後(插入後、將資料傳送到分片後等)重新整理目錄元資料。

  • bytes_to_throw_insert- 如果超過此數量的壓縮位元組將等待非同步 INSERT,則會引發異常。0 - 不扔。預設為 0。

  • bytes_to_delay_insert- 如果超過此數量的壓縮位元組將等待非同步 INSERT,則查詢將被延遲。0 - 不延遲。預設為 0。

  • max_delay_to_insert- 如果有大量用於非同步傳送的待處理位元組,則以秒為單位將資料插入分散式表的最大延遲。預設 60。

  • monitor_batch_inserts- 與Distributed_directory_monitor_batch_inserts相同

  • monitor_split_batch_on_failure- 與distributed_directory_monitor_split_batch_on_failure相同

  • monitor_sleep_time_ms- 與Distributed_directory_monitor_sleep_time_ms相同

  • monitor_max_sleep_time_ms- 與Distributed_directory_monitor_max_sleep_time_ms相同

    注意:

  - 當資料首先儲存在啟動器節點磁碟上,然後非同步傳送到分片時,僅影響非同步 INSERT(即 `insert_distributed_sync=false`)。
  - 可能會顯著降低刀片的效能
  - 影響將分散式表文件夾中儲存的資料寫入接受插入的**節點**。 如果需要保證將資料寫入底層 MergeTree 表 

例子

CREATE TABLE hits_all AS hits
ENGINE = Distributed(logs, default, hits[, sharding_key[, policy_name]])
SETTINGS
    fsync_after_insert=0,
    fsync_directories=0;

 

  資料將從叢集中的所有伺服器讀取logs,從default.hits位於叢集中每臺伺服器上的表中讀取。資料不僅在遠端伺服器上被讀取,而且在遠端伺服器上進行部分處理(在可能的範圍內)。例如,對於帶有 的查詢GROUP BY,資料將在遠端伺服器上聚合,聚合函式的中間狀態將被髮送到請求伺服器。然後將進一步彙總資料。

  可以使用返回字串的常量表達式來代替資料庫名稱。例如:currentDatabase()

二、叢集 

  叢集在伺服器配置檔案中配置:

<remote_servers>
    <logs>
        <!-- Inter-server per-cluster secret for Distributed queries
             default: no secret (no authentication will be performed)

             If set, then Distributed queries will be validated on shards, so at least:
             - such cluster should exist on the shard,
             - such cluster should have the same secret.

             And also (and which is more important), the initial_user will
             be used as current user for the query.
        -->
        <!-- <secret></secret> -->
        <shard>
            <!-- Optional. Shard weight when writing data. Default: 1. -->
            <weight>1</weight>
            <!-- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). -->
            <internal_replication>false</internal_replication>
            <replica>
                <!-- Optional. Priority of the replica for load balancing (see also load_balancing setting). Default: 1 (less value has more priority). -->
                <priority>1</priority>
                <host>example01-01-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-01-2</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <weight>2</weight>
            <internal_replication>false</internal_replication>
            <replica>
                <host>example01-02-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-02-2</host>
                <secure>1</secure>
                <port>9440</port>
            </replica>
        </shard>
    </logs>
</remote_servers>

 

  這裡定義了一個叢集,其名稱logs由兩個分片組成,每個分片包含兩個副本。分片是指包含不同部分資料的伺服器(為了讀取所有資料,必須訪問所有分片)。副本是複製伺服器(為了讀取所有資料,可以訪問任何一個副本上的資料)。

  叢集名稱不能包含點。

  為每個伺服器指定引數host,port和可選user的 , passwordsecure:compression

  • host– 遠端伺服器的地址。可以使用域或 IPv4 或 IPv6 地址。如果指定域,則伺服器在啟動時會發出 DNS 請求,只要伺服器正在執行,就會儲存結果。如果 DNS 請求失敗,則伺服器不會啟動。如果更改 DNS 記錄,請重新啟動伺服器。
  • port– 信使活動的 TCP 埠(tcp_port在配置中,通常設定為 9000)。不要與http_port.
  • user– 連線遠端伺服器的使用者名稱。預設值為default使用者。此使用者必須有權連線到指定的伺服器。訪問在users.xml檔案中配置。有關詳細資訊,請參閱訪問許可權部分。
  • password– 連線遠端伺服器的密碼(未遮蔽)。預設值:空字串。
  • secure- 是否使用安全的 SSL/TLS 連線。通常還需要指定埠(預設安全埠是9440)。伺服器應該監聽<tcp_port_secure>9440</tcp_port_secure>並配置正確的證書。
  • compression- 使用資料壓縮。預設值:true

  指定副本時,讀取時將為每個分片選擇一個可用副本。可以配置負載平衡演算法(訪問哪個副本的首選項)如果未建立與伺服器的連線,則將嘗試連線一個短暫的超時。如果連線失敗,將選擇下一個副本,以此類推所有副本。如果所有副本的連線嘗試都失敗,則嘗試以相同的方式重複幾次。這有利於彈性,但不提供完整的容錯能力:遠端伺服器可能接受連線,但可能無法正常工作,或者工作不佳。

  可以僅指定一個分片(在這種情況下,查詢處理應稱為遠端,而不是分散式)或最多指定任意數量的分片。在每個分片中,可以指定從一個到任意數量的副本。可以為每個分片指定不同數量的副本。

  可以在配置中指定任意數量的叢集。

  要檢視的叢集,請使用該system.clusters表。

  該Distributed引擎允許使用像本地伺服器這樣的叢集。但是,叢集的配置不能動態指定,必須在伺服器配置檔案中進行配置。通常,叢集中的所有伺服器都將具有相同的叢集配置(儘管這不是必需的)。配置檔案中的叢集會即時更新,無需重新啟動伺服器。

  如果每次都需要向一組未知的分片和副本傳送查詢,則無需建立Distributed表 -remote而是使用 table 函式。

三、寫入資料 

  1.將資料寫入叢集有兩種方法:

  1)可以定義將哪些資料寫入哪些伺服器並直接在每個分片上執行寫入。換句話說,對表所指向INSERT的叢集中的遠端表執行直接語句。Distributed這是最靈活的解決方案,因為可以使用任何分片方案,即使是由於主題領域的要求而並非微不足道的分片方案。這也是最優化的解決方案,因為資料可以完全獨立地寫入不同的分片。

  2)可以在表上執行INSERT語句Distributed在這種情況下,表將在伺服器本身之間分配插入的資料。為了寫入Distributed表,它必須sharding_key配置引數(除非只有一個分片)。

       2.注意:

  每個分片都可以<weight>在配置檔案中定義。預設情況下,權重為1資料以與分片權重成比例的數量分佈在分片上。將所有分片權重相加,然後將每個分片的權重除以總和,以確定每個分片的比例。例如,如果有兩個分片,第一個的權重為 1,而第二個的權重為 2,第一個將被髮送三分之一 (1 / 3) 的插入行,第二個將被髮送三分之二 (2 / 3)。

  每個分片都可以internal_replication在配置檔案中定義引數。如果此引數設定為true,則寫入操作會選擇第一個健康的副本並向其寫入資料。如果表基礎的Distributed表是複製表(例如任何Replicated*MergeTree表引擎),請使用此選項。其中一個表副本將接收寫入,並將自動複製到其他副本。

  如果internal_replication設定為false(預設值),則將資料寫入所有副本。在這種情況下,Distributed表本身會複製資料。這比使用複製表更糟糕,因為不檢查副本的一致性,並且隨著時間的推移,它們將包含稍微不同的資料。

  為了選擇將一行資料傳送到的分片,分析分片表示式,並將其除以分片的總權重得到餘數。prev_weights該行被髮送到對應於餘數從到的半區間的分片prev_weights + weight,其中prev_weights是編號最小的分片的總權重,並且weight是該分片的權重。例如,如果有兩個分片,第一個的權重為 9,而第二個的權重為 10,則該行將傳送到第一個分片以獲取範圍 [0, 9) 中的餘數,併發送到第二個用於範圍 [9, 19) 的餘數。

  分片表示式可以是返回整數的常量和表列中的任何表示式。例如,可以使用表示式rand()進行資料的隨機分佈,或者UserID通過除以使用者 ID 的餘數進行分佈(然後單個使用者的資料將駐留在單個分片上,這簡化了執行INJOIN按使用者)。如果其中一列分佈不夠均勻,可以將其包裝在雜湊函式中,例如intHash64(UserID).

  除法的簡單餘數是分片的有限解決方案,並不總是合適的。它適用於中型和大量資料(數十臺伺服器),但不適用於非常大量的資料(數百臺或更多伺服器)。在後一種情況下,使用主題區域所需的分片方案,而不是使用表中的條目Distributed

在以下情況下,應該關注分片方案:

  • 使用需要通過特定鍵連線資料(IN或)的查詢。JOIN如果資料被這個鍵分片,你可以使用 local INorJOIN代替GLOBAL INor GLOBAL JOIN,這樣效率更高。
  • 使用大量伺服器(數百臺或更多)和大量小型查詢,例如,查詢單個客戶(例如網站、廣告商或合作伙伴)的資料。為了使小查詢不影響整個叢集,將單個客戶端的資料定位在單個分片上是有意義的。或者,可以設定雙層分片:將整個叢集劃分為“層”,其中一層可能由多個分片組成。單個客戶端的資料位於單個層上,但可以根據需要將分片新增到層中,並且資料在其中隨機分佈。Distributed為每一層建立表,併為全域性查詢建立一個共享分散式表。

  資料是非同步寫入的。當插入表中時,資料塊只是寫入本地檔案系統。資料會盡快在後臺傳送到遠端伺服器。傳送資料的週期由distributed_directory_monitor_sleep_time_msdistributed_directory_monitor_max_sleep_time_ms設定管理。Distributed引擎會單獨傳送每個包含插入資料的檔案,但可以使用 Distributed_directory_monitor_batch_inserts 設定啟用批量傳送檔案此設定通過更好地利用本地伺服器和網路資源來提高叢集效能。應該通過查看錶目錄中的檔案列表(等待發送的資料)來檢查資料是否傳送成功:/var/lib/clickhouse/data/database/table/執行後臺任務的執行緒數可以通過background_distributed_schedule_pool_size設定來設定。

  INSERT如果伺服器停止存在或在訪問表後粗略重新啟動(例如,由於硬體故障)Distributed,則插入的資料可能會丟失。如果在表目錄中檢測到損壞的資料部分,則將其轉移到broken子目錄中,不再使用。

  3.應用

  直接寫分散式表的優點自然是可以讓ClickHouse控制資料到分片的路由,而缺點:

  • 資料是先寫到一個分散式表的例項中並快取起來,再逐漸分發到各個分片上去,實際是雙寫了資料(寫入放大),浪費資源;
  • 資料寫入預設是非同步的,短時間內可能造成不一致;
  • 目標表中會產生較多的小parts,使merge(即compaction)過程壓力增大。

  相對而言,直接寫本地表是同步操作,更快,parts的大小也比較合適,但是就要求應用層額外實現sharding和路由邏輯,如輪詢或者隨機等。

  以下為分散式表插入流程圖:

 

 

四、讀取資料

  查詢Distributed表時,SELECT查詢被髮送到所有分片並且無論資料如何在分片中分佈(它們可以完全隨機分佈)都可以工作。新增新分片時,不必將舊資料傳輸到其中。相反,可以通過使用更重的權重向其寫入新資料——資料將稍微不均勻地分佈,但查詢將正確有效地工作。

  啟用該max_parallel_replicas選項後,查詢處理將在單個分片內的所有副本中並行處理。

  在分散式表上執行查詢的流程簡圖如下所示。發出查詢後,各個例項之間會交換自己持有的分片的表資料,最終彙總到同一個例項上返回給使用者。

 

 

五、虛擬列

  _shard_num— 包含shard_num表中的值system.clusters。型別:UInt32。

  注意

  由於遠端叢集表功能在內部建立臨時分散式表,_shard_num因此在那裡也可用。

六、總結

  ClickHouse分散式表的本質並不是一張表,而是一些本地物理表(分片)的分散式檢視,本身並不儲存資料。

  在生產環境中總是推薦寫本地表、讀分散式表。