1. 程式人生 > >kafka connector的quick start

kafka connector的quick start

準備前提:

本例所有操作在linux虛擬機器環境下完成。

已經下載好kafka connector confluent。進入安裝目錄下:

第一步:start zookeeper:

bin/zookeeper-server-start etc/kafka/zookeeper.properties

第二步:start kafka broker:

bin/kafka-server-start etc/kafka/server.properties

第三步:start schema registry:

bin/schema-registry-start etc/schema-registry/schema-registry.properties

第四步:start source (or sink):本例使用standalone模式,將mysql的資料匯入進kafka中(保證mysql服務開啟並且有jdbc的驅動)。如果要匯出到某一容器即需在後面加上sink.properties的檔案(補全[]中內容,不用則不用加[]內容)。

bin/connect-standalone etc/shcema-registry/connect-avro-standalone.propertie etc/kafka-connect-jdbc/mysql-source.properties [...sink.properties]

其中:connect-avro-standalone.properties可使用預設配置;

mysql-source.properties的內容如下:

# tasks to create: 
name=test-mysql
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector 
tasks.max=1 
# a table called 'users' will be written to the topic 'test-mysql-jdbc-users'. 自行替換自己資料庫的user和password。
connection.url=jdbc:mysql://172.24.8.114:3306/connector?user=$USER&password=$PASSWORD 
# 模式有timestamp和incrementing,根據自己資料庫中表的情況自行修改,本例中因為表中有自增主鍵所以選擇incrementing
mode=incrementing
# 自增的列名為id
incrementing.column.name=id 
topic.prefix=test-mysql-jdbc-

第五步:如果沒有配置sink,可新開一個consumer的console檢視匯入的資料。

bin/kafka-avro-console-consumer --new-consumer --vootstrap-server localhost:9092 --topic test-mysql-jdbc-表名 --from-beginning