替代Flume——Kafka Connect簡介
我們知道過去對於Kafka的定義是分散式,分割槽化的,帶備份機制的日誌提交服務。也就是一個分散式的訊息佇列,這也是他最常見的用法。但是Kafka不止於此,開啟最新的官網。
我們看到Kafka最新的定義是:Apache Kafka® is a distributed streaming platform
分散式流處理平臺。
這裡也清晰的描述了Kafka的特點:Kafka用於構建實時資料管道和流式應用程式。它具有水平可擴充套件性、容錯性、速度極快,並在數千家公司投入生產。
所以現在的Kafka已經不僅是一個分散式的訊息佇列,更是一個流處理平臺。這源於它於0.9.0.0和0.10.0.0引入的兩個全新的元件Kafka Connect與Kafka Streaming。
Kafka Connect簡介
我們知道訊息佇列必須存在上下游的系統,對訊息進行搬入搬出。比如經典的日誌分析系統,通過flume讀取日誌寫入kafka,下游由storm進行實時的資料處理。
Kafka Connect的作用就是替代Flume,讓資料傳輸這部分工作可以由Kafka Connect來完成。Kafka Connect是一個用於在Apache Kafka和其他系統之間可靠且可靠地傳輸資料的工具。它可以快速地將大量資料集合移入和移出Kafka。
Kafka Connect的匯入作業可以將資料庫或從應用程式伺服器收集的資料傳入到Kafka,匯出作業可以將Kafka中的資料傳遞到查詢系統,也可以傳輸到批處理系統以進行離線分析。
Kafka Connect功能包括:
- 一個通用的Kafka連線的框架 - Kafka Connect規範化了其他資料系統與Kafka的整合,簡化了聯結器開發,部署和管理
- 分散式和獨立模式 - 支援大型分散式的管理服務,也支援小型生產環境的部署
- REST介面 - 通過易用的REST API提交和管理Kafka Connect
- 自動偏移管理 - 只需從聯結器獲取一些資訊,Kafka Connect就可以自動管理偏移量提交過程,因此聯結器開發人員無需擔心聯結器開發中偏移量提交這部分的開發
- 預設情況下是分散式和可擴充套件的 - Kafka Connect構建在現有的組管理協議之上。可以新增擴充套件叢集
- 流媒體/批處理整合 - 利用Kafka現有的功能,Kafka Connect是橋接流媒體和批處理資料系統的理想解決方案
執行Kafka Connect
Kafka Connect目前支援兩種執行模式:獨立和叢集。
獨立模式
在獨立模式下,只有一個程序,這種更容易設定和使用。但是沒有容錯功能。
啟動:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
獨立模式配置
第一個引數config/connect-standalone.properties是一些基本的配置:
這幾個在獨立和叢集模式下都需要設定:
#bootstrap.servers kafka叢集列表
bootstrap.servers=localhost:9092
#key.converter key的序列化轉換器 比如json的 key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter value的序列化轉換器
value.converter=org.apache.kafka.connect.json.JsonConverter
#獨立模式特有的配置:
#offset.storage.file.filename 用於儲存偏移量的檔案
offset.storage.file.filename =/home/kafka/connect.offsets
獨立模式聯結器配置(配置檔案)
後面的引數connector1.properties [connector2.properties ...] 可以多個,是聯結器配置內容
這裡我們配置一個從檔案讀取資料並存入kafka的配置:
connect-file-sink.properties
name
- 聯結器的唯一名稱。嘗試再次使用相同名稱註冊將失敗。connector.class
- 聯結器的Java類 此聯結器的類的全名或別名。這裡我們選擇FileStreamSinktasks.max
- 應為此聯結器建立的最大任務數。如果聯結器無法達到此級別的並行性,則可能會建立更少的任務。key.converter
- (可選)覆蓋worker設定的預設金鑰轉換器。value.converter
- (可選)覆蓋worker設定的預設值轉換器。下面兩個必須設定一個:
topics
- 以逗號分隔的主題列表,用作此聯結器的輸入topics.regex
- 用作此聯結器輸入的主題的Java正則表示式
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
可以在聯結器中配置轉換器
需要指定引數:
transforms
- 轉換的別名列表,指定將應用轉換的順序。transforms.$alias.type
- 轉換的完全限定類名。transforms.$alias.$transformationSpecificConfig
轉換的配置屬性
例如,我們把剛才的檔案轉換器的內容新增欄位
首先設定connect-standalone.properties
key.converter.schemas.enable = false
value.converter.schemas.enable = false
設定connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
transforms=MakeMap, InsertSource
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=line
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source
沒有轉換前的結果:
"foo"
"bar"
"hello world"
轉換後:
{"line":"foo","data_source":"test-file-source"}
{"line":"bar","data_source":"test-file-source"}
{"line":"hello world","data_source":"test-file-source"}
常用轉換型別:
- InsertField - 使用靜態資料或記錄元資料新增欄位
- ReplaceField - 過濾或重新命名欄位
- MaskField - 用型別的有效空值替換欄位(0,空字串等)
- ValueToKey Value轉換為Key
- HoistField - 將整個事件作為單個欄位包裝在Struct或Map中
- ExtractField - 從Struct和Map中提取特定欄位,並在結果中僅包含此欄位
- SetSchemaMetadata - 修改架構名稱或版本
- TimestampRouter - 根據原始主題和時間戳修改記錄主題
- RegexRouter - 根據原始主題,替換字串和正則表示式修改記錄主題
叢集模式
叢集模式下,可以擴充套件,容錯。
啟動:
> bin/connect-distributed.sh config/connect-distributed.properties
在叢集模式下,Kafka Connect在Kafka主題中儲存偏移量,配置和任務狀態。
叢集模式配置
connect-distributed.properties
#也需要基本的配置
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
#還有一些配置要注意
#group.id(預設connect-cluster) - Connect的組id 請注意,這不得與使用者的組id 衝突
group.id=connect-cluster
#用於儲存偏移的主題; 此主題應具有許多分割槽
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#用於儲存聯結器和任務配置的主題 只能一個分割槽
config.storage.topic=connect-configs
config.storage.replication.factor=1
#用於儲存狀態的主題; 此主題可以有多個分割槽
status.storage.topic=connect-status
status.storage.replication.factor=1
在叢集模式下,配置並不會在命令列傳進去,而是需要REST API來建立,修改和銷燬聯結器。
叢集模式聯結器配置(REST API)
可以配置REST API伺服器,支援http與https
listeners=http://localhost:8080,https://localhost:8443
預設情況下,如果未listeners
指定,則REST伺服器使用HTTP協議在埠8083上執行。
以下是當前支援的REST API:
GET /connectors
- 返回活動聯結器列表POST /connectors
- 建立一個新的聯結器; 請求主體應該是包含字串name
欄位的JSON物件和包含config
聯結器配置引數的物件欄位GET /connectors/{name}
- 獲取有關特定聯結器的資訊GET /connectors/{name}/config
- 獲取特定聯結器的配置引數PUT /connectors/{name}/config
- 更新特定聯結器的配置引數GET /connectors/{name}/status
- 獲取聯結器的當前狀態,包括它是否正在執行,失敗,暫停等,分配給哪個工作人員,錯誤資訊(如果失敗)以及所有任務的狀態GET /connectors/{name}/tasks
- 獲取當前為聯結器執行的任務列表GET /connectors/{name}/tasks/{taskid}/status
- 獲取任務的當前狀態,包括它是否正在執行,失敗,暫停等,分配給哪個工作人員,以及錯誤資訊是否失敗PUT /connectors/{name}/pause
- 暫停聯結器及其任務,這將停止訊息處理,直到恢復聯結器PUT /connectors/{name}/resume
- 恢復暫停的聯結器(如果聯結器未暫停,則不執行任何操作)POST /connectors/{name}/restart
- 重新啟動聯結器(通常是因為它已經失敗)POST /connectors/{name}/tasks/{taskId}/restart
- 重啟個別任務(通常因為失敗)DELETE /connectors/{name}
- 刪除聯結器,暫停所有任務並刪除其配置
聯結器開發指南
kakfa允許開發人員自己去開發一個聯結器。
核心概念
要在Kafka和其他系統之間複製資料,使用者需要建立一個Connector
Connector有兩種形式:
SourceConnectors
從另一個系統匯入資料,例如,JDBCSourceConnector
將關係資料庫匯入Kafka
SinkConnectors
匯出資料,例如,HDFSSinkConnector
將Kafka主題的內容匯出到HDFS檔案
和對應的Task:
SourceTask
和SinkTask
Task形成輸入輸出流,開發Task要注意偏移量的問題。
每個流應該是一系列鍵值記錄。還需要定期提交已處理的資料的偏移量,以便在發生故障時,處理可以從上次提交的偏移量恢復。Connector還需要是動態的,實現還負責監視外部系統是否存在任何更改。
開發一個簡單的聯結器
開發聯結器只需要實現兩個介面,即Connector
和Task
。
這裡我們簡單開發一個FileStreamConnector。
此聯結器是為在獨立模式下使用,SourceConnector/
SourceTask讀取檔案的每一行,
SinkConnector/
SinkTask每個記錄寫入一個檔案。
聯結器示例:
繼承SourceConnector,新增欄位(要讀取的檔名和要將資料傳送到的主題)
public class FileStreamSourceConnector extends SourceConnector {
private String filename;
private String topic;
定義實際讀取資料的類
@Override
public Class<? extends Task> taskClass() {
return FileStreamSourceTask.class;
}
在FileStreamSourceTask
下面定義該類。接下來,我們新增一些標準的生命週期方法,start()
和stop()
@Override
public void start(Map<String, String> props) {
// The complete version includes error handling as well.
filename = props.get(FILE_CONFIG);
topic = props.get(TOPIC_CONFIG);
}
@Override
public void stop() {
// Nothing to do since no background monitoring is required.
}
最後,實施的真正核心在於taskConfigs()
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
// Only one input stream makes sense.
Map<String, String> config = new HashMap<>();
if (filename != null)
config.put(FILE_CONFIG, filename);
config.put(TOPIC_CONFIG, topic);
configs.add(config);
return configs;
}
任務示例:
源任務
實現SourceTask
建立FileStreamSourceTask繼承SourceTask
public class FileStreamSourceTask extends SourceTask {
String filename;
InputStream stream;
String topic;
@Override
public void start(Map<String, String> props) {
filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
stream = openOrThrowError(filename);
topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
}
@Override
public synchronized void stop() {
stream.close();
}
接下來,我們實現任務的主要功能,即poll()
從輸入系統獲取事件並返回以下內容的方法List
:
@Override
public List<SourceRecord> poll() throws InterruptedException {
try {
ArrayList<SourceRecord> records = new ArrayList<>();
while (streamValid(stream) && records.isEmpty()) {
LineAndOffset line = readToNextLine(stream);
if (line != null) {
Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename);
Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset);
records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
} else {
Thread.sleep(1);
}
}
return records;
} catch (IOException e) {
// Underlying stream was killed, probably as a result of calling stop. Allow to return
// null, and driving thread will handle any shutdown if necessary.
}
return null;
}
接收任務
不像SourceConnector
和SinkConnector
,SourceTask
並SinkTask
有非常不同的介面,因為SourceTask
採用的是拉介面,並SinkTask
使用推介面。兩者共享公共生命週期方法,但SinkTask
完全不同:
public abstract class SinkTask implements Task {
public void initialize(SinkTaskContext context) {
this.context = context;
}
public abstract void put(Collection<SinkRecord> records);
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
}
這是一個簡單的例子,它們有簡單的結構化資料 - 每一行只是一個字串。幾乎所有實用的聯結器都需要具有更復雜資料格式的模式。要建立更復雜的資料,您需要使用Kafka Connect data
API。
Schema schema = SchemaBuilder.struct().name(NAME)
.field("name", Schema.STRING_SCHEMA)
.field("age", Schema.INT_SCHEMA)
.field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
.build();
Struct struct = new Struct(schema)
.put("name", "Barbara Liskov")
.put("age", 75);
更多Kafka相關技術文章:
什麼是Kafka?
Kafka監控工具彙總
Kafka快速入門
Kafka核心之Consumer
Kafka核心之Producer
更多實時計算,Flink,Kafka等相關技術博文,歡迎關注實時流式計算
相關推薦
替代Flume——Kafka Connect簡介
我們知道過去對於Kafka的定義是分散式,分割槽化的,帶備份機制的日誌提交服務。也就是一個分散式的訊息佇列,這也是他最常見的用法。但是Kafka不止於此,開啟最新的官網。 我們看到Kafka最新的定義是:Apache Kafka® is a distributed streaming pla
kafka connect簡介以及部署
1、什麼是kafka connect? 根據官方介紹,Kafka Connect是一種用於在Kafka和其他系統之間可擴充套件的、可靠的流式傳輸資料的工具。它使得能夠快速定義將大量資料集合移入和
Kafka Connect Details 詳解
nec get json格式 err pos print sta 參考 document 目錄 1. Kafka Connect Details 詳解 1.1. 概覽 1.2. 啟動和配置 1.2.1. Standalone 單機模式 1.2.2. Distrib
kafka-connect-hdfs連接hadoop hdfs時候,竟然是單點的,太可怕了。。。果斷改成HA
olt author vax confluent del ... cond exceptio client 2017-08-16 11:57:28,237 WARN [org.apache.hadoop.hdfs.LeaseRenewer][458] - <Fail
kafka-connect-hdfs重啟,進去RECOVERY狀態,從hadoop hdfs拿租約,很正常,但是也太久了吧
not handle 個人 ret sun response sass pre sub 雖說這個算是正常現象,等的時間也太久了吧。分鐘級了。這個RECOVERY裏面的WAL有點多余。有這麽久的時間,早從新讀取kafka寫入hdfs了。純屬個人見解。 @SuppressWa
[Flume][Kafka]Flume 與 Kakfa結合例子(Kakfa 作為flume 的sink 輸出到 Kafka topic)
over ins log etc avro channels xmx rri pan Flume 與 Kakfa結合例子(Kakfa 作為flume 的sink 輸出到 Kafka topic) 進行準備工作: $sudo mkdir -p /flume/web_s
Flume+Kafka+Zookeeper搭建大數據日誌采集框架
flume+kafka+zookeeper1. JDK的安裝 參考jdk的安裝,此處略。2. 安裝Zookeeper 參考我的Zookeeper安裝教程中的“完全分布式”部分。3. 安裝Kafka 參考我的Kafka安裝教程中的“完全分布式搭建”部分。4. 安裝Flume 參考
flume+kafka+hdfs詳解
utf-8 conf prop nts command format ext sink 1.4 flume架構圖 單節點flume配置 flume-1.4.0 啟動flume bin/flume-ng agent --conf ./conf -f conf/flume
Flume+Kafka+Storm+Redis構建大數據實時處理系統:實時統計網站PV、UV+展示
大數據 實時計算 Storm [TOC] 1 大數據處理的常用方法 前面在我的另一篇文章中《大數據采集、清洗、處理:使用MapReduce進行離線數據分析完整案例》中已經有提及到,這裏依然給出下面的圖示: 前面給出的那篇文章是基於MapReduce的離線數據分析案例,其通過對網站產生的用戶訪問
基於Flume+Kafka+Spark Streaming打造實時流處理項目實戰課程
大數據本課程從實時數據產生和流向的各個環節出發,通過集成主流的分布式日誌收集框架Flume、分布式消息隊列Kafka、分布式列式數據庫HBase、及當前最火爆的Spark Streaming打造實時流處理項目實戰,讓你掌握實時處理的整套處理流程,達到大數據中級研發工程師的水平!下載地址:百度網盤下載
Flume+Kafka+SparkStreaming+Hbase+可視化(一)
日誌導入 ash channels style 導入 com system ase spark 一、前置準備: Linux命令基礎 Scala、Python其中一門 Hadoop、Spark、Flume、Kafka、Hbase基礎知識 二、分布式日誌收集框架Flume
Kafka基礎簡介
err 日誌 class put 介紹 分享 頻率 actor oid kafka是一個分布式的,可分區的,可備份的日誌提交服務,它使用獨特的設計實現了一個消息系統的功能。 由於最近項目升級,需要將spring的事件機制轉變為消息機制,針對後期考慮,選擇了kafka作為消息
Kafka Connect 日誌配置
筆者之前啟動kafka connect程序後,發現過一段時間,日誌太大。 啟動kafka connect的命令如下: cd /home/tidb/confluent-4.1.1 ./bin/connect-distributed -daemon ./etc/schema-regis
SparkStreaming(14):log4j日誌-flume-kafka-SparkStreaming的整合
一、功能實現 模擬log4j的日誌生產,將日誌輸出到flume伺服器。然後,通過flume將日誌資訊輸出到kafka,進而Streaming可以從kafka獲得日誌,並且進行簡單的處理。 二、步驟 1.目的: 使用log4j將日誌輸按照一定格式輸出,並且傳遞給flume伺服器特定埠接
Kafka學習之路 (一)Kafka的簡介
要求 異步通信 images 等等 ron 服務器角色 消費 消息 崩潰 一、簡介 1.1 概述 Kafka是最初由Linkedin公司開發,是一個分布式、分區的、多副本的、多訂閱者,基於zookeeper協調的分布式日誌系統(也可以當做MQ系統),常見可以用於web/ng
Kafka之簡介
1.什麼是kafka? 1.1入門 1.1.1簡介 kafka是LinkedIn開源的一個分散式MQ系統,現在是Apache的孵化專案。 Kafka is a distributed,partitioned,replicated commit logservice ,在
Kafka學習筆記(1)----Kafka的簡介和Linux下單機安裝
1. Kafka簡介 Kafka is a distributed,partitioned,replicated commit logservice。它提供了類似於JMS的特性,但是在設計實現上完全不同,此外它並不是JMS規範的實現。kafka對訊息儲存時根據Topic進行歸類,傳送訊息者成為Produ
Apache flume+Kafka獲取實時日誌資訊
Flume簡介以及安裝 Flume是一個分散式的對海量日誌進行採集,聚合和傳輸的系統。Flume系統分為三個元件,分別是source,sink,channel:source表明資料的來源,可能來自檔案,Avro等,channel作為source和sink的橋樑,作為資料的臨時儲存地,channal是
kafka connect 使用說明
利用 消息處理 設置 specified 沒有 opc sets 自動化 max KAFKA CONNECT 使用說明 一、概述 kafka connect 是一個可擴展的、可靠的在kafka和其他系統之間流傳輸的數據工具。簡而言之就是他可以通過Connector(連接器)
深入理解 Kafka Connect:轉換器和序列化
原文連結:https://blog.csdn.net/D55dffdh/article/details/82423831 AI 前線導讀:Kafka Connect 是一個簡單但功能強大的工具,可用於 Kafka 和其他系統之間的整合。人們對 Kafka Connect 最常見的誤解之一