Kafka實戰:從RDBMS到Hadoop,七步實現實時傳輸
本文是關於Flume成功應用Kafka的研究案例,深入剖析它是如何將RDBMS實時資料流匯入到HDFS的Hive表中。
對於那些想要把資料快速攝取到Hadoop中的企業來講,Kafka是一個很好的選擇。Kafka是什麼?Kafka是一個分散式、可伸縮、可信賴的訊息傳遞系統,利用釋出-訂閱模型來整合應用程式/資料流。同時,Kafka還是Hadoop技術堆疊中的關鍵元件,能夠很好地支援實時資料分析或者貨幣化的物聯網資料。
本文服務於技術人群。下面就圖解Kafka是如何把資料流從RDBMS(關係資料庫管理系統)匯入Hive,並且成功支援了一個實時分析案例。作為參考,本文中使用的元件版本分別為Hive 1.2.1,Flume 1.6 以及 Kafka 0.9。
Kafka所在位置:解決方案的整體結構
下圖顯示瞭解決方案的整體結構:Kafka和Flume的結合,再加上Hive的事務特性,RDBMS的事務資料被成功傳遞到目標Hive表中。
七步實現Hadoop實時資料匯入
現在讓我們深入方案細節,並展示如何在幾個步驟內將資料流匯入Hadoop。
1.從RDBMS中提取資料
所有關係型資料庫都有一個日誌檔案,用來記錄最新的事務。解決方案的第一步就是獲取這些事務資料,同時要確保這些資料格式是可以被Hadoop所接受的。
2.設定Kafka訊息傳送端
釋出Kafka話題訊息的程序稱為“訊息傳送端”。“話題”裡有各種Kafka所需要維護的資訊類別,RDBMS資料也會被轉換成Kafka話題。對於這個示例,要求設定一個服務於整個銷售團隊的資料庫,且該資料庫中的事務資料均以Kafka話題形式釋出。以下步驟都需要設定Kafka訊息傳送端:
$ cd /usr/hdp/2.4.0.0-169/kafka
$ bin/kafka-topics.sh --create --zookeeper sandbox.hortonworks.com:2181 --replication-factor 1 --partitions 1 --topic SalesDBTransactions
Created topic "SalesDBTransactions".
$ bin/kafka-topics.sh --list --zookeeper sandbox.hortonworks.com:2181
SalesDBTransactions
3.設定Hive
接下來將建立一個Hive表,準備接收銷售團隊的資料庫事務。這個例子中,我們將建立一個使用者資料表:
[[email protected] ~]$ beeline -u jdbc:hive2:// -n hive -p hive
0: jdbc:hive2://> use raj;
create table customers (id string, name string, email string, street_address string, company string)
partitioned by (time string)
clustered by (id) into 5 buckets stored as orc
location '/user/bedrock/salescust'
TBLPROPERTIES ('transactional'='true');
為了確保Hive能夠有效處理事務資料,以下設定要求在Hive配置中進行:
hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
4.為Kafka到Hive的資料流設定Flume代理
現在來看下如何建立一個Flume代理,用於收集Kafka話題資料並向Hive表傳送資料。
在啟用Flume代理前,要通過這幾個步驟設定執行環境:
$ pwd
/home/bedrock/streamingdemo
$ mkdir flume/checkpoint
$ mkdir flume/data
$ chmod 777 -R flume
$ export HIVE_HOME=/usr/hdp/current/hive-server2
$ export HCAT_HOME=/usr/hdp/current/hive-webhcat
$ pwd
/home/bedrock/streamingdemo/flume
$ mkdir logs
再如下所示建立一個log4j屬性檔案:
[[email protected] conf]$ vi log4j.properties
flume.root.logger=INFO,LOGFILE
flume.log.dir=/home/bedrock/streamingdemo/flume/logs
flume.log.file=flume.log
然後為Flume代理配置以下檔案:
$ vi flumetohive.conf
flumeagent1.sources = source_from_kafka
flumeagent1.channels = mem_channel
flumeagent1.sinks = hive_sink
# Define / Configure source
flumeagent1.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumeagent1.sources.source_from_kafka.zookeeperConnect = sandbox.hortonworks.com:2181
flumeagent1.sources.source_from_kafka.topic = SalesDBTransactions
flumeagent1.sources.source_from_kafka.groupID = flume
flumeagent1.sources.source_from_kafka.channels = mem_channel
flumeagent1.sources.source_from_kafka.interceptors = i1
flumeagent1.sources.source_from_kafka.interceptors.i1.type = timestamp
flumeagent1.sources.source_from_kafka.consumer.timeout.ms = 1000
# Hive Sink
flumeagent1.sinks.hive_sink.type = hive
flumeagent1.sinks.hive_sink.hive.metastore = thrift://sandbox.hortonworks.com:9083
flumeagent1.sinks.hive_sink.hive.database = raj
flumeagent1.sinks.hive_sink.hive.table = customers
flumeagent1.sinks.hive_sink.hive.txnsPerBatchAsk = 2
flumeagent1.sinks.hive_sink.hive.partition = %y-%m-%d-%H-%M
flumeagent1.sinks.hive_sink.batchSize = 10
flumeagent1.sinks.hive_sink.serializer = DELIMITED
flumeagent1.sinks.hive_sink.serializer.delimiter = ,
flumeagent1.sinks.hive_sink.serializer.fieldnames = id,name,email,street_address,company
# Use a channel which buffers events in memory
flumeagent1.channels.mem_channel.type = memory
flumeagent1.channels.mem_channel.capacity = 10000
flumeagent1.channels.mem_channel.transactionCapacity = 100
# Bind the source and sink to the channel
flumeagent1.sources.source_from_kafka.channels = mem_channel
flumeagent1.sinks.hive_sink.channel = mem_channel
5.啟用Flume代理
通過以下指令啟用Flume代理:
$ /usr/hdp/apache-flume-1.6.0/bin/flume-ng agent -n flumeagent1 -f ~/streamingdemo/flume/conf/flumetohive.conf
6.啟用Kafka流
作為示例下面是一個模擬事務的訊息集,這在實際系統中需要通過源資料庫才能生成。例如,以下可能來自Oracle流,在回放被提交到資料庫的SQL事務,也可能來自GoldenGate。
$ cd /usr/hdp/2.4.0.0-169/kafka
$ bin/kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic SalesDBTransactions
1,"Nero Morris","porttitor.interdum@Sedcongue.edu","P.O. Box 871, 5313 Quis Ave","Sodales Company"
2,"Cody Bond","ante.lectus.convallis@antebibendumullamcorper.ca","232-513 Molestie Road","Aenean Eget Magna Incorporated"
3,"Holmes Cannon","a@metusAliquam.edu","P.O. Box 726, 7682 Bibendum Rd.","Velit Cras LLP"
4,"Alexander Lewis","risus@urna.edu","Ap #375-9675 Lacus Av.","Ut Aliquam Iaculis Inc."
5,"Gavin Ortiz","sit.amet@aliquameu.net","Ap #453-1440 Urna. St.","Libero Nec Ltd"
6,"Ralph Fleming","sociis.natoque.penatibus@quismassaMauris.edu","363-6976 Lacus. St.","Quisque Fringilla PC"
7,"Merrill Norton","at.sem@elementum.net","P.O. Box 452, 6951 Egestas. St.","Nec Metus Institute"
8,"Nathaniel Carrillo","eget@massa.co.uk","Ap #438-604 Tellus St.","Blandit Viverra Corporation"
9,"Warren Valenzuela","tempus.scelerisque.lorem@ornare.co.uk","Ap #590-320 Nulla Av.","Ligula Aliquam Erat Incorporated"
10,"Donovan Hill","facilisi@augue.org","979-6729 Donec Road","Turpis In Condimentum Associates"
11,"Kamal Matthews","augue.ut@necleoMorbi.org","Ap #530-8214 Convallis, St.","Tristique Senectus Et Foundation"
7.接收Hive資料
如果上面所有的步驟都完成了,那麼現在就可以從Kafka傳送資料,可以看到資料流在幾秒鐘內就會被髮送到Hive表。