flume讀取binlog與kafka整合
一、現將kafka調通
檢視zookeeper的topic
cd /usr/software/zookeeper/zookeeper/bin
./zkCli.sh start
ls /brokers/topics
先來說一下,刪除kafka無用topic
./kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic test
如果按照上述走是可以看到訊息的消費的。
啟動kafka
bin/kafka-server-start.sh config/server.properties
建立話題
./kafka-create-topic.sh -partition 1 -replica 1 -zookeeper centos1:2181 -topic test
看一下話題
./kafka-list-topic.sh -zookeeper centos1:2181
開啟producer話題
./kafka-console-producer.sh -broker-list centos1:9092 -topic test
開啟consumer
./kafka-console-consumer.sh -zookeeper centos1:2181 -topic test
二、再將flume調通
bin/flume-ng agent -c conf -f conf/flume-conf.properties -n sync &
-c:表示配置檔案的目錄,在此我們配置了flume-env.sh,也在conf目錄下;
-f:指定配置檔案,這個配置檔案必須在全域性選項的--conf引數定義的目錄下,就是說這個配置檔案要在前面配置的conf目錄下面;
-n:表示要啟動的agent的名稱,也就是我們flume.properties配置檔案裡面,配置項的字首,這裡我們配的字首是【sync】;
結果遇到報錯:
[[email protected] flume]# Info: Sourcing environment configuration script /usr/software/flume/conf/flume-env.sh
Info: Including Hive libraries found via () for Hive access
+ exec /opt/jdk1.8.0_181/bin/java -Xms100m -Xmx200m -Dcom.sun.management.jmxremote -cp '/usr/software/flume/conf:/usr/local/flume/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application -f conf/flume-conf.properties -n sync
錯誤: 找不到或無法載入主類 org.apache.flume.node.Application
flume啟動Could not find or load main class org.apache.flume.node.Application
修改flume的資料夾名稱後,啟動flume可能會失敗,錯誤資訊如下:
Error: Could not find or load main class org.apache.flume.node.Application
這個是因為環境變數的問題。 export看一下是不是有個FLUME_HOME的環境變數指向原來的資料夾,
如果是的話:
果然是環境變數配置錯了,然後修改過後
source /etc/profile
此時我們向mysql表中開始插入資料發現consumer的客戶端中,沒有消費記錄。有以下截圖為準。
原因是日誌報錯。我們可以看一下flume日誌。
flume的日誌檔案配置不用我說了吧。
原因是缺少hibernate的配置項,看一下配置檔案,果然。
所以現在需要修改,參考文件修改吧。
我們先來看看配置檔案的寫法
a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1
###########sql source#################
# For each one of the sources, the type is defined
a1.sources.src-1.type = org.keedio.flume.source.SQLSource
a1.sources.src-1.hibernate.connection.url = jdbc:mysql://centos1:3306/hr
# Hibernate Database connection properties
a1.sources.src-1.hibernate.connection.user = root
a1.sources.src-1.hibernate.connection.password =
a1.sources.src-1.hibernate.connection.autocommit = true
a1.sources.src-1.table = ef_arap
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.src-1.run.query.delay=10000
a1.sources.src-1.status.file.path = /usr/software/flume/logs
a1.sources.src-1.status.file.name = sqlSource.status
# Custom query
a1.sources.src-1.columns.to.select = *
a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 1000
a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10
##############################
a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 10000
a1.channels.ch-1.transactionCapacity = 10000
a1.channels.ch-1.byteCapacityBufferPercentage = 20
a1.channels.ch-1.byteCapacity = 800000
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList = centos1:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
a1.sinks.k1.channel = ch-1
a1.sources.src-1.channels=ch-1
然後我們首先jps殺死以前的flume程序。
緊接著我們啟動。
./bin/flume-ng agent -c conf -f conf/flume-conf.properties -n a1 &
現在可以看到日誌是成功的。
可以看到consumer端已經收到了。
超級激動啊!但是很快日誌就有報錯了。
我們看一下到底是什麼報錯吧。
又有報錯。
Cannot commit transaction. Byte capacity allocated to store event body 640000.0reached. Please increase heap space/byte capacity allocated to the channel as the sinks may not be keeping up with the sources
這個錯誤應該是調優的問題了。
沒有問題了。
參考文章:
kafka
https://www.cnblogs.com/xiaodf/p/6093261.html#4
flume