flume實現從kafka讀取訊息到hive
這裡以公司三臺伺服器為例 192.168.2.250(master) 192.168.2.251;192.168.2.252
參考部落格 http://miximixi.me/index.php/archives/961
1.安裝 kafka,flume,hive,zookeeper
2. 在master(192.168.2.250)主機下 vi flume/conf/flume.conf
#source: skafka sink: shive channel: k2h
agent.sources = skafka
agent.sinks = shive
agent.channels = k2h
#k2h --> shive
agent.sinks.shive.channel = k2h
#skafka --> k2h
agent.sources.skafka.channels = k2h
#describe the source
agent.sources.skafka.type = org.apache.flume.source.kafka.KafkaSource
#agent.sources.skafka.zookeeper.connect = 192.168.2.250:2181,192.168.2.251:2181,192.168.2.252:2181 //zookeeper連線和topic設定要注意,不然報錯
topic不存在
agent.sources.skafka.zookeeperConnect = 192.168.2.250:2181,192.168.2.251:2181,192.168.2.252:2181
agent.sources.skafka.batchSize = 5000
agent.sources.skafka.batchDurationMillis = 2000
#agent.sources.skafka.kafka.bootstrap.servers = 192.168.2.251:6667
#agent.sources.skafka.kafka.topic = flume2hive
#agent.sources.skafka.kafka.consumer.group.id = flume2hive_group
agent.sources.skafka.topic = flumetest
agent.sources.skafka.groupId = flumetest
#use a channel which buffers events in memory
agent.channels.k2h.type = memory
agent.channels.k2h.capacity = 1000
#channel獲取或者sink獲得一次最大的資料量
agent.channels.k2h.transactionCapacity = 100
#sinks type hive
agent.sinks.shive.type = hive
agent.sinks.shive.channel = k2h
agent.sinks.shive.hive.metastore = thrift://hadoop04.ltyicloud.com:9083
agent.sinks.shive.hive.database = userdb
agent.sinks.shive.hive.table = flumetest
#agent.sinks.shive.hive.partition = asia,%{country},%y-%m-%d-%H-%M
#agent.sinks.shive.hive.partition = %y-%m-%d
agent.sinks.shive.useLocalTimeStamp = false
#agent.sinks.shive.round = true
#agent.sinks.shive.roundValue = 10
#agent.sinks.shive.roundUnit = minute
agent.sinks.shive.serializer = JSON
3.hive中建立帶事務的表(看別人研究的可以不用帶事務,這裡一直調不通,就用帶事務的)
create table test (id int, name string)
clustered
by (id)
into 8 buckets
stored
as orc TBLPROPERTIES ('transactional'='true');
4.啟動kafka消費者客戶端
./kafka-console-producer.sh --broker-list 192.168.2.200:9092 --topic flume2hive
傳送json串 {id=2,name='zhangsan'}
5.啟動flume
bin/flume-ng agent -n agent -c conf -f conf/flume-conf.properties -Dflume.root.logger=INFO啟動過程中報錯缺lib包 (將hive2 下的lib路徑指進來)
bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name agent -Dflume.root.logger=INFO,console --classpath "/usrp/current/hive-server2-hive2b/*"