kafka connector的quick start
阿新 • • 發佈:2018-12-19
準備前提:
本例所有操作在linux虛擬機器環境下完成。
已經下載好kafka connector confluent。進入安裝目錄下:
第一步:start zookeeper:
bin/zookeeper-server-start etc/kafka/zookeeper.properties
第二步:start kafka broker:
bin/kafka-server-start etc/kafka/server.properties
第三步:start schema registry:
bin/schema-registry-start etc/schema-registry/schema-registry.properties
第四步:start source (or sink):本例使用standalone模式,將mysql的資料匯入進kafka中(保證mysql服務開啟並且有jdbc的驅動)。如果要匯出到某一容器即需在後面加上sink.properties的檔案(補全[]中內容,不用則不用加[]內容)。
bin/connect-standalone etc/shcema-registry/connect-avro-standalone.propertie etc/kafka-connect-jdbc/mysql-source.properties [...sink.properties]
其中:connect-avro-standalone.properties可使用預設配置;
mysql-source.properties的內容如下:
# tasks to create: name=test-mysql connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 # a table called 'users' will be written to the topic 'test-mysql-jdbc-users'. 自行替換自己資料庫的user和password。 connection.url=jdbc:mysql://172.24.8.114:3306/connector?user=$USER&password=$PASSWORD # 模式有timestamp和incrementing,根據自己資料庫中表的情況自行修改,本例中因為表中有自增主鍵所以選擇incrementing mode=incrementing # 自增的列名為id incrementing.column.name=id topic.prefix=test-mysql-jdbc-
第五步:如果沒有配置sink,可新開一個consumer的console檢視匯入的資料。
bin/kafka-avro-console-consumer --new-consumer --vootstrap-server localhost:9092 --topic test-mysql-jdbc-表名 --from-beginning