kafka connect簡介以及部署
1、什麼是kafka connect?
根據官方介紹,Kafka Connect是一種用於在Kafka和其他系統之間可擴充套件的、可靠的流式傳輸資料的工具。它使得能夠快速定義將大量資料集合移入和移出Kafka的聯結器變得簡單。 Kafka Connect可以獲取整個資料庫或從所有應用程式伺服器收集指標到Kafka主題,使資料可用於低延遲的流處理。匯出作業可以將資料從Kafka topic傳輸到二次儲存和查詢系統,或者傳遞到批處理系統以進行離線分析。 Kafka Connect功能包括:
- Kafka connector通用框架,提供統一的整合API
- 同時支援分散式模式和單機模式
- REST 介面,用來檢視和管理Kafka connectors
- 自動化的offset管理,開發人員不必擔心錯誤處理的影響
- 分散式、可擴充套件
- 流/批處理整合
KafkaCnnect有兩個核心概念:Source和Sink。 Source負責匯入資料到Kafka,Sink負責從Kafka匯出資料,它們都被稱為Connector。
2、kafka connect概念。
Kafka connect的幾個重要的概念包括:connectors、tasks、workers和converters。
- Connectors-通過管理任務來細條資料流的高階抽象
- Tasks- 資料寫入kafka和資料從kafka讀出的實現
- Workers-執行connectors和tasks的程序
- Converters- kafka connect和其他儲存系統直接傳送或者接受資料之間轉換資料
1) Connectors:在kafka connect中,connector決定了資料應該從哪裡複製過來以及資料應該寫入到哪裡去,一個connector例項是一個需要負責在kafka和其他系統之間複製資料的邏輯作業,connector plugin是jar檔案,實現了kafka定義的一些介面來完成特定的任務。
2) Tasks:task是kafka connect資料模型的主角,每一個connector都會協調一系列的task去執行任務,connector可以把一項工作分割成許多的task,然後再把task分發到各個worker中去執行(分散式模式下),task不自己儲存自己的狀態資訊,而是交給特定的kafka 主題去儲存(config.storage.topic 和status.storage.topic)。在分散式模式下有一個概念叫做任務再平衡(Task Rebalancing),當一個connector第一次提交到叢集時,所有的worker都會做一個task rebalancing從而保證每一個worker都運行了差不多數量的工作,而不是所有的工作壓力都集中在某個worker程序中,而當某個程序掛了之後也會執行task rebalance。
3) Workers:connectors和tasks都是邏輯工作單位,必須安排在程序中執行,而在kafka connect中,這些程序就是workers,分別有兩種worker:standalone和distributed。這裡不對standalone進行介紹,具體的可以檢視官方文件。我個人覺得distributed worker很棒,因為它提供了可擴充套件性以及自動容錯的功能,你可以使用一個group.ip來啟動很多worker程序,在有效的worker程序中它們會自動的去協調執行connector和task,如果你新加了一個worker或者掛了一個worker,其他的worker會檢測到然後在重新分配connector和task。
4) Converters: converter會把bytes資料轉換成kafka connect內部的格式,也可以把kafka connect內部儲存格式的資料轉變成bytes,converter對connector來說是解耦的,所以其他的connector都可以重用,例如,使用了avro converter,那麼jdbc connector可以寫avro格式的資料到kafka,當然,hdfs connector也可以從kafka中讀出avro格式的資料。
3、kafka connect的啟動。
Kafka connect的工作模式分為兩種,分別是standalone模式和distributed模式。
在獨立模式種,所有的work都在一個獨立的程序種完成,如果用於生產環境,建議使用分散式模式,都在真的就有點浪費kafka connect提供的容錯功能了。
standalone啟動的命令很簡單,如下:
bin/connect-standalone.shconfig/connect-standalone.properties connector1.properties[connector2.properties ...]
一次可以啟動多個connector,只需要在引數中加上connector的配置檔案路徑即可。
啟動distributed模式命令如下:
bin/connect-distributed.shconfig/connect-distributed.properties
在connect-distributed.properties的配置檔案中,其實並沒有配置了你的connector的資訊,因為在distributed模式下,啟動不需要傳遞connector的引數,而是通過REST API來對kafka connect進行管理,包括啟動、暫停、重啟、恢復和檢視狀態的操作,具體介紹詳見下文。
在啟動kafkaconnect的distributed模式之前,首先需要建立三個主題,這三個主題的配置分別對應connect-distributed.properties檔案中config.storage.topic(default connect-configs)、offset.storage.topic
(default connect-offsets
) 、status.storage.topic
(default connect-status)的配置,那麼它們分別有啥用處呢?
- config.storage.topic:用以儲存connector和task的配置資訊,需要注意的是這個主題的分割槽數只能是1,而且是有多副本的。(推薦partition 1,replica 3)
- offset.storage.topic:用以儲存offset資訊。(推薦partition50,replica 3)
- status.storage.topic:用以儲存connetor的狀態資訊。(推薦partition10,replica 3)
以下是建立主題命令:
# config.storage.topic=connect-configs
$ bin/kafka-topics --create --zookeeper localhost:2181 --topicconnect-configs --replication-factor 3 --partitions 1
# offset.storage.topic=connect-offsets
$ bin/kafka-topics --create --zookeeper localhost:2181 --topicconnect-offsets --replication-factor 3 --partitions 50
# status.storage.topic=connect-status
$ bin/kafka-topics --create --zookeeper localhost:2181 --topicconnect-status --replication-factor 3 --partitions 10
4、通過rest api管理connector
因為kafka connect的意圖是以服務的方式去執行,所以它提供了REST API去管理connectors,預設的埠是8083,你也可以在啟動kafka connect之前在配置檔案中新增rest.port配置。
- GET /connectors – 返回所有正在執行的connector名
- POST /connectors – 新建一個connector; 請求體必須是json格式並且需要包含name欄位和config欄位,name是connector的名字,config是json格式,必須包含你的connector的配置資訊。
- GET /connectors/{name} – 獲取指定connetor的資訊
- GET /connectors/{name}/config – 獲取指定connector的配置資訊
- PUT /connectors/{name}/config – 更新指定connector的配置資訊
- GET /connectors/{name}/status – 獲取指定connector的狀態,包括它是否在執行、停止、或者失敗,如果發生錯誤,還會列出錯誤的具體資訊。
- GET /connectors/{name}/tasks – 獲取指定connector正在執行的task。
- GET /connectors/{name}/tasks/{taskid}/status – 獲取指定connector的task的狀態資訊
- PUT /connectors/{name}/pause – 暫停connector和它的task,停止資料處理知道它被恢復。
- PUT /connectors/{name}/resume – 恢復一個被暫停的connector
- POST /connectors/{name}/restart – 重啟一個connector,尤其是在一個connector執行失敗的情況下比較常用
- POST /connectors/{name}/tasks/{taskId}/restart – 重啟一個task,一般是因為它執行失敗才這樣做。
- DELETE /connectors/{name} – 刪除一個connector,停止它的所有task並刪除配置。