1. 程式人生 > >flume實現從kafka讀取訊息到hive

flume實現從kafka讀取訊息到hive

這裡以公司三臺伺服器為例  192.168.2.250(master)   192.168.2.251;192.168.2.252

參考部落格    http://miximixi.me/index.php/archives/961

1.安裝 kafka,flume,hive,zookeeper

2. 在master(192.168.2.250)主機下    vi  flume/conf/flume.conf

#source: skafka sink: shive channel: k2h
agent.sources = skafka
agent.sinks = shive
agent.channels = k2h

#k2h --> shive
agent.sinks.shive.channel = k2h
#skafka --> k2h
agent.sources.skafka.channels = k2h

#describe the source
agent.sources.skafka.type = org.apache.flume.source.kafka.KafkaSource
#agent.sources.skafka.zookeeper.connect = 192.168.2.250:2181,192.168.2.251:2181,192.168.2.252:2181                               //zookeeper連線和topic設定要注意,不然報錯                                                                                                                                                                                                                                   topic不存在
agent.sources.skafka.zookeeperConnect = 192.168.2.250:2181,192.168.2.251:2181,192.168.2.252:2181
agent.sources.skafka.batchSize = 5000
agent.sources.skafka.batchDurationMillis = 2000
#agent.sources.skafka.kafka.bootstrap.servers = 192.168.2.251:6667                                         
#agent.sources.skafka.kafka.topic = flume2hive
#agent.sources.skafka.kafka.consumer.group.id = flume2hive_group
agent.sources.skafka.topic = flumetest
agent.sources.skafka.groupId = flumetest

#use a channel which buffers events in memory
agent.channels.k2h.type = memory
agent.channels.k2h.capacity = 1000
#channel獲取或者sink獲得一次最大的資料量
agent.channels.k2h.transactionCapacity = 100

#sinks type  hive
agent.sinks.shive.type = hive
agent.sinks.shive.channel = k2h
agent.sinks.shive.hive.metastore = thrift://hadoop04.ltyicloud.com:9083
agent.sinks.shive.hive.database = userdb
agent.sinks.shive.hive.table = flumetest
#agent.sinks.shive.hive.partition = asia,%{country},%y-%m-%d-%H-%M
#agent.sinks.shive.hive.partition = %y-%m-%d
agent.sinks.shive.useLocalTimeStamp = false
#agent.sinks.shive.round = true
#agent.sinks.shive.roundValue = 10
#agent.sinks.shive.roundUnit = minute
agent.sinks.shive.serializer = JSON

3.hive中建立帶事務的表(看別人研究的可以不用帶事務,這裡一直調不通,就用帶事務的)

create table test (id int, name string) 
clustered
 by (id)
 into 8 buckets 
stored
 as orc TBLPROPERTIES ('transactional'='true');

4.啟動kafka消費者客戶端

./kafka-console-producer.sh --broker-list  192.168.2.200:9092  --topic flume2hive

傳送json串       {id=2,name='zhangsan'}

5.啟動flume 

 bin/flume-ng agent -n agent -c conf -f conf/flume-conf.properties -Dflume.root.logger=INFO 

啟動過程中報錯缺lib包 (將hive2 下的lib路徑指進來)  

 bin/flume-ng agent --conf conf --conf-file    conf/flume.conf  --name agent -Dflume.root.logger=INFO,console  --classpath  "/usrp/current/hive-server2-hive2b/*"