flume採集資料到kafka和hive
阿新 • • 發佈:2019-02-08
- 構建ftp服務
在安裝flume的機器上新增sftp服務
useradd flumetest
passwd flumetest
#ubuntu-檢視所有使用者
cat /etc/shadowapt-get install vsftpd
#檢視
service vsftpd status
#建立接受資料目錄
mkdir /home/flumetest/alarm
在vsftpd服務配置檔案中設定:
# Allowanonymous FTP? (Disabled by default)
anonymous_enable=NO
#Uncomment this to enable any form of FTP write command.
write_enable=YES
#chroot_local_user=YES
chroot_list_enable=YES
#(default follows)
chroot_list_file=/etc/vsftpd.chroot_list
#啟動kafka,kafka節點都需要啟動
nohup sh bin/kafka-server-start.sh config/server.properties > /dev/null2>&1 &
#建立topic
bin/kafka-topics.sh --create --zookeeper 116.62.*.*:2181 --replication-factor 2 --partitions 2 --topic alarm
#檢視topic List
bin/kafka-topics.sh --list --zookeeper 116.62.*.*:2181
啟動consumer,檢視資料
bin/kafka-console-consumer.sh --bootstrap-server 116.62.*.*:9092--topic alarm --from-beginning
#刪除topic
bin/kafka-topics.sh --zookeeper 116.62.*.*:2181 --delete --topic alarm
#如果kafaka啟動時載入的配置檔案中server.properties沒有配置delete.topic.enable=true,那麼此時的刪除並不是真正的刪除,而是把topic標記為:marked for deletion,此時你若想真正刪除它,可以如下操作:
#(1)登入zookeeper客戶端:命令:./bin/zookeeper-client
#(2)找到topic所在的目錄:ls /brokers/topics
#(3)找到要刪除的topic,執行命令:rmr /brokers/topics/【topic name】即可,此時topic被徹底刪除。
3 建立hive表
create table if not EXISTS alarm_no_partition(alm_timestring,alm_timeMs int,tag_NameID string,alm_Type int,priID int,alm_Ack_Timestring,alm_Ack_TimeMs int,alm_Group int,alm_Sub_Area int,tag_Data_Typestring,alm_Ack_Flg string,alm_Remove_Flg string,alm_Remove_Timestring,alm_Remove_TimeMs int,alarm_date string) clusteredby (priID) into 2 buckets stored as orc TBLPROPERTIES("transactional"="true");
channel
agent1.sources= alarms1
agent1.channels= alarmc1 alarmc2 alarmc3
agent1.sinks= alarmk1 alarmk2 alarmk3
#SpoolingDirectory
#setalarms1
agent1.sources.alarms1.type=spooldir
agent1.sources.alarms1.spoolDir=/home/flumetest/alarm/
agent1.sources.alarms1.channels=alarmc1alarmc2 alarmc3
agent1.sources.alarms1.fileHeader= false
#setalarmk1
agent1.sinks.alarmk1.channel=alarmc1
agent1.sinks.alarmk1.type= org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.alarmk1.topic= alarm
agent1.sinks.alarmk1.kafka.bootstrap.servers= 116.62.*.*:9092;116.62.*.*:9092;116.62.*.*:9092
agent1.sinks.alarmk1.kafka.flumeBatchSize= 20
agent1.sinks.alarmk1.kafka.producer.acks= 1
agent1.sinks.alarmk1.kafka.producer.linger.ms= 1
agent1.sinks.alarmk1.kafka.producer.compression.type= snappy
#setalarmk2
agent1.sinks.alarmk2.channel=alarmc2
agent1.sinks.alarmk2.type=hive
agent1.sinks.alarmk2.hive.metastore= thrift://127.0.0.1:9083
agent1.sinks.alarmk2.hive.database= alarm
agent1.sinks.alarmk2.hive.table= alarm_no_partition
#agent1.sinks.alarmk2.hive.partition=%{alarm_date}
agent1.sinks.alarmk2.useLocalTimeStamp= false
#agent1.sinks.alarmk2.roundValue= 10
#agent1.sinks.alarmk2.roundUnit= minute
agent1.sinks.alarmk2.serializer= DELIMITED
agent1.sinks.alarmk2.serializer.delimiter=,
agent1.sinks.alarmk2.serializer.serdeSeparator='\t'
agent1.sinks.alarmk2.serializer.fieldnames=alm_time,alm_timems,tag_nameid,alm_type,priid,alm_ack_time,alm_ack_timems,alm_group,alm_sub_area,tag_data_type,alm_ack_flg,alm_remove_flg,alm_remove_time,alm_remove_timems,alarm_date
#setalarmk3
#setalarmk3
#agent1.sinks.alarmk3.channel=alarmc3
#agent1.sinks.alarmk3.type=hbase
#agent1.sinks.alarmk3.table=alarm_test
#agent1.sinks.alarmk3.columnFamily=
#agent1.sinks.alarmk3.serializer=org.apache.flume.sink.hbase.RegexHbaseEventSerializer
#setalarmc1
agent1.channels.alarmc1.type= memory
agent1.channels.alarmc1.capacity= 1000
agent1.channels.alarmc1.transactionCapacity= 100
#setalarmc2
agent1.channels.alarmc2.type= memory
agent1.channels.alarmc2.capacity= 1000
agent1.channels.alarmc2.transactionCapacity= 100
#setalarmc3
agent1.channels.alarmc3.type= memory
agent1.channels.alarmc3.capacity= 1000
agent1.channels.alarmc3.transactionCapacity= 100
啟動flume
nohup bin/flume-ng agent -cconf -f conf/flume-conf.properties -n agent1 -Dflume.root.logger=INFO,LOGFILE-Dflume.log.dir=logs >> /dev/null 2>&1
5 載入資料
向目錄中新增檔案載入完成檔案字尾新增.COMPLETED