1. 程式人生 > >kafka connect簡介以及部署

kafka connect簡介以及部署

1、什麼是kafka connect?

根據官方介紹,Kafka Connect是一種用於在Kafka和其他系統之間可擴充套件的、可靠的流式傳輸資料的工具。它使得能夠快速定義將大量資料集合移入和移出Kafka的聯結器變得簡單。 Kafka Connect可以獲取整個資料庫或從所有應用程式伺服器收集指標到Kafka主題,使資料可用於低延遲的流處理。匯出作業可以將資料從Kafka topic傳輸到二次儲存和查詢系統,或者傳遞到批處理系統以進行離線分析。 Kafka Connect功能包括:

  • Kafka connector通用框架,提供統一的整合API
  • 同時支援分散式模式和單機模式
  • REST 介面,用來檢視和管理Kafka connectors
  • 自動化的offset管理,開發人員不必擔心錯誤處理的影響
  • 分散式、可擴充套件
  • 流/批處理整合

KafkaCnnect有兩個核心概念:Source和Sink。 Source負責匯入資料到Kafka,Sink負責從Kafka匯出資料,它們都被稱為Connector。

 

2、kafka connect概念。

Kafka connect的幾個重要的概念包括:connectors、tasks、workers和converters。

  • Connectors-通過管理任務來細條資料流的高階抽象
  • Tasks- 資料寫入kafka和資料從kafka讀出的實現
  • Workers-執行connectors和tasks的程序
  • Converters- kafka connect和其他儲存系統直接傳送或者接受資料之間轉換資料

1) Connectors:在kafka connect中,connector決定了資料應該從哪裡複製過來以及資料應該寫入到哪裡去,一個connector例項是一個需要負責在kafka和其他系統之間複製資料的邏輯作業,connector plugin是jar檔案,實現了kafka定義的一些介面來完成特定的任務。

2)  Tasks:task是kafka connect資料模型的主角,每一個connector都會協調一系列的task去執行任務,connector可以把一項工作分割成許多的task,然後再把task分發到各個worker中去執行(分散式模式下),task不自己儲存自己的狀態資訊,而是交給特定的kafka 主題去儲存(config.storage.topic 和status.storage.topic)。在分散式模式下有一個概念叫做任務再平衡(Task Rebalancing),當一個connector第一次提交到叢集時,所有的worker都會做一個task rebalancing從而保證每一個worker都運行了差不多數量的工作,而不是所有的工作壓力都集中在某個worker程序中,而當某個程序掛了之後也會執行task rebalance。

3) Workers:connectors和tasks都是邏輯工作單位,必須安排在程序中執行,而在kafka connect中,這些程序就是workers,分別有兩種worker:standalone和distributed。這裡不對standalone進行介紹,具體的可以檢視官方文件。我個人覺得distributed worker很棒,因為它提供了可擴充套件性以及自動容錯的功能,你可以使用一個group.ip來啟動很多worker程序,在有效的worker程序中它們會自動的去協調執行connector和task,如果你新加了一個worker或者掛了一個worker,其他的worker會檢測到然後在重新分配connector和task。

4) Converters: converter會把bytes資料轉換成kafka connect內部的格式,也可以把kafka connect內部儲存格式的資料轉變成bytes,converter對connector來說是解耦的,所以其他的connector都可以重用,例如,使用了avro converter,那麼jdbc connector可以寫avro格式的資料到kafka,當然,hdfs connector也可以從kafka中讀出avro格式的資料。


3、kafka connect的啟動。

Kafka connect的工作模式分為兩種,分別是standalone模式和distributed模式。

在獨立模式種,所有的work都在一個獨立的程序種完成,如果用於生產環境,建議使用分散式模式,都在真的就有點浪費kafka connect提供的容錯功能了。

standalone啟動的命令很簡單,如下:

bin/connect-standalone.shconfig/connect-standalone.properties connector1.properties[connector2.properties ...]

一次可以啟動多個connector,只需要在引數中加上connector的配置檔案路徑即可。

啟動distributed模式命令如下:

bin/connect-distributed.shconfig/connect-distributed.properties

在connect-distributed.properties的配置檔案中,其實並沒有配置了你的connector的資訊,因為在distributed模式下,啟動不需要傳遞connector的引數,而是通過REST API來對kafka connect進行管理,包括啟動、暫停、重啟、恢復和檢視狀態的操作,具體介紹詳見下文。

在啟動kafkaconnect的distributed模式之前,首先需要建立三個主題,這三個主題的配置分別對應connect-distributed.properties檔案中config.storage.topic(default connect-configs)、offset.storage.topic (default connect-offsets) 、status.storage.topic (default connect-status)的配置,那麼它們分別有啥用處呢?

  • config.storage.topic:用以儲存connector和task的配置資訊,需要注意的是這個主題的分割槽數只能是1,而且是有多副本的。(推薦partition 1,replica 3)
  • offset.storage.topic:用以儲存offset資訊。(推薦partition50,replica 3)
  • status.storage.topic:用以儲存connetor的狀態資訊。(推薦partition10,replica 3)

以下是建立主題命令:

# config.storage.topic=connect-configs
$ bin/kafka-topics --create --zookeeper localhost:2181 --topicconnect-configs --replication-factor 3 --partitions 1
 
# offset.storage.topic=connect-offsets
$ bin/kafka-topics --create --zookeeper localhost:2181 --topicconnect-offsets --replication-factor 3 --partitions 50
 
# status.storage.topic=connect-status
$ bin/kafka-topics --create --zookeeper localhost:2181 --topicconnect-status --replication-factor 3 --partitions 10

4、通過rest api管理connector

因為kafka connect的意圖是以服務的方式去執行,所以它提供了REST API去管理connectors,預設的埠是8083,你也可以在啟動kafka connect之前在配置檔案中新增rest.port配置。

  • GET /connectors – 返回所有正在執行的connector名
  • POST /connectors – 新建一個connector; 請求體必須是json格式並且需要包含name欄位和config欄位,name是connector的名字,config是json格式,必須包含你的connector的配置資訊。
  • GET /connectors/{name} – 獲取指定connetor的資訊
  • GET /connectors/{name}/config – 獲取指定connector的配置資訊
  • PUT /connectors/{name}/config – 更新指定connector的配置資訊
  • GET /connectors/{name}/status – 獲取指定connector的狀態,包括它是否在執行、停止、或者失敗,如果發生錯誤,還會列出錯誤的具體資訊。
  • GET /connectors/{name}/tasks – 獲取指定connector正在執行的task。
  • GET /connectors/{name}/tasks/{taskid}/status – 獲取指定connector的task的狀態資訊
  • PUT /connectors/{name}/pause – 暫停connector和它的task,停止資料處理知道它被恢復。
  • PUT /connectors/{name}/resume – 恢復一個被暫停的connector
  • POST /connectors/{name}/restart – 重啟一個connector,尤其是在一個connector執行失敗的情況下比較常用
  • POST /connectors/{name}/tasks/{taskId}/restart – 重啟一個task,一般是因為它執行失敗才這樣做。
  • DELETE /connectors/{name} – 刪除一個connector,停止它的所有task並刪除配置。