使用flume將kafka資料sink到HBase
1. hbase sink介紹
如果還不瞭解flume請檢視我寫的其他flume下的部落格。
接下來的內容主要來自flume官方文件的學習。
hbase的sink主要有以下兩種。兩種方式都提供和HBASE一樣的一致性保證,即行級原子性
1.1 HbaseSink
agent的配置時提供兩種序列化模式:
- SimpleHbaseEventSerializer: 將整個事件body部分當做完整的一列寫入hbase
- RegexHbaseEventSerializer: 根據正則表示式將event body拆分到不同的列當中
優點:
安全性較高:支援往secure hbase寫資料(hbase可以開啟kerberos校驗)
缺點:
效能沒有後面的那種AsyncHBaseSink高
1.2 AsyncHbaseSink
非同步的Sink,可見速度是比前者快的,但是不支援往Secure Hbase寫資料。
採用的序列化器是:SimpleAsyncHbaseEventSerializer,也支援將event body分割成多個列,插入到對應KEY的ROW裡
2. 配置flume
我們這裡hbase沒有開啟安全相關選項,一般這叢集也主要在內網環境。所以我們這裡採用AsyncHbaseSink來進行本次操作。source則為kafka。
channel我們也選用kafka channel。之所以選擇kafka channel的依據可以參考
配置檔案如下:
# ------------------- 定義資料流---------------------- # source的名字 agent.sources = kafkaSource # channels的名字,建議按照type來命名 agent.channels = kafkaChannel # sink的名字,建議按照目標來命名 agent.sinks = hbaseSink # ---------------------定義source和sink的繫結關係---------------- # 指定source使用的channel名字 agent.sources.kafkaSource.channels = kafkaChannel # 指定sink需要使用的channel的名字,注意這裡是channel agent.sinks.hbaseSink.channel = kafkaChannel #-------- kafkaSource相關配置----------------- # 定義訊息源型別 agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource # 定義kafka所在zk的地址 agent.sources.kafkaSource.zookeeperConnect = 10.45.9.139:2181 # 配置消費的kafka topic agent.sources.kafkaSource.topic = my-topic-test # 配置消費者組的id agent.sources.kafkaSource.groupId = flume # 消費超時時間,參照如下寫法可以配置其他所有kafka的consumer選項。注意格式從kafka.xxx開始是consumer的配置屬性 agent.sources.kafkaSource.kafka.consumer.timeout.ms = 100 #------- kafkaChannel相關配置------------------------- # channel型別 agent.channels.kafkaChannel.type = org.aprache.flume.channel.kafka.KafkaChannel # channel儲存的事件容量,即佇列長度 agent.channels.kafkaChannel.capacity=10000 # 事務容量 agent.channels.kafkaChannel.transactionCapacity=1000 # kafka broker list agent.channels.kafkaChannel.brokerList=mysql1:9092,mysql4:9092 # 指定topic agent.channels.topic=flume # 指定zk地址 agent.channels.kafkaChannel.zookeeperConnect=10.45.9.139:2181 # 指定producer的選項,關鍵是指定acks的值,保證訊息傳送的可靠性,retries採用預設的3 agent.channels.kafkaChannel.kafka.producer.acks=all #---------hbaseSink 相關配置------------------ # 指定sink型別。PS:如果使用RegexHbaseEventSerializer只能使用hbase型別 # agent.sinks.hbaseSink.type = hbase agent.sinks.hbaseSink.type = asynchbase # 指定hbase中的表名 agent.sinks.hbaseSink.table = student # 指明column family agent.sinks.hbaseSink.columnFamily= info # 使用的serializer agent.sinks.hbaseSink.serializer=org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer # 如果需要使用正則處理value可以使用以下的serializer #agent.sinks.hbaseSink.serializer=org.apache.flume.sink.hbase.RegexHbaseEventSerializer # 指定正則表示式,這裡用的正則是匹配逗號分隔的字串 #agent.sinks.hbaseSink.serializer.regex= ^([^,]+),([^,]+),([^,]+),([^,]+)$ # 指定在列族中對應的的colName # agent.sinks.hbaseSink.serializer.colNames=c1,c2,c3 # 指定hbase所用的zk集合 agent.sinks.hbaseSink.zookeeperQuorum = mysql3:2181,mysql4:2181,mysql5:2181
3. 執行測試flume
在$FLUME_HOME/bin下執行以下命令執行。後臺會開啟一個Application 的Java程序
nohup sh flume-ng agent --conf-file ../conf/flume-conf.properties --name agent -Dflume.root.logger=INFO,console &
然後寫個kafka的producer往我們前面定義的my-topic-test中寫訊息。
producer的程式碼如下:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * @author Wan Kaiming on 2016/8/1 * @version 1.0 */ public class MyProducer { public static void main(String[] args) { Properties props = new Properties(); //broker地址 這裡用域名,記得修改本地的hosts檔案 props.put("bootstrap.servers", "mysql1:9092,mysql4:9092"); //訊息可靠性語義 props.put("acks", "all"); //請求broker失敗進行重試的次數,避免由於請求broker失敗造成的訊息重複 props.put("retries", 0); //按批發送,每批的訊息數量 props.put("batch.size", 16384); //防止來不及傳送,延遲一點點時間,使得能夠批量傳送訊息 props.put("linger.ms", 1); //緩衝大小,bytes props.put("buffer.memory", 33554432); //key的序列化類 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value的序列化類 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //建立一個Producer物件,載入配置上下文資訊 Producer<String, String> producer = new KafkaProducer<String,String>(props); for(int i=0;i<6;i++){ producer.send(new ProducerRecord<String, String>("my-topic-test", "hello", "world")); } // while(true){ // //呼叫send方法進行傳送。send方法將訊息加到快取,非同步傳送 // producer.send(new ProducerRecord<String, String>("my-topic", "hello", "world")); // } producer.close(); } }
執行完畢後可以去hbase shell中檢查下:
總結:可見key全部是由flume自動生成的。傳送給kafka的 value值"world"全部成功儲存到HBASE
PS:
- 最後一行多出來的incRow是Flume的SimpleAsyncHbaseEventSerializer中使用的。用來統計行數的,每次都在最後一行,效果就是一個計數的count。
- 這裡產生的行的名字是pCol和iCol都是SimpleAsyncHbaseEventSerializer的預設值,其實可以自定義指定
總結:可見,如果需要更加自由的對寫入HBASE的資料做自定義,建議需要了解下這個Event序列化類的原始碼,然後可以自定義序列化類
4. 使用RegexHbaseEventSerializer來處理些HBASE的值
- 修改flume的配置檔案,改用RegexHbaseEventSerializer,我使用的配置檔案如下
# ------------------- 定義資料流---------------------- # source的名字 agent.sources = kafkaSource # channels的名字,建議按照type來命名 agent.channels = kafkaChannel # sink的名字,建議按照目標來命名 agent.sinks = hbaseSink # ---------------------定義source和sink的繫結關係---------------- # 指定source使用的channel名字 agent.sources.kafkaSource.channels = kafkaChannel # 指定sink需要使用的channel的名字,注意這裡是channel agent.sinks.hbaseSink.channel = kafkaChannel #-------- kafkaSource相關配置----------------- # 定義訊息源型別 agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource # 定義kafka所在zk的地址 agent.sources.kafkaSource.zookeeperConnect = 10.45.9.139:2181 # 配置消費的kafka topic agent.sources.kafkaSource.topic = my-topic-regex # 配置消費者組的id agent.sources.kafkaSource.groupId = flume # 消費超時時間,參照如下寫法可以配置其他所有kafka的consumer選項。注意格式從kafka.xxx開始是consumer的配置屬性 agent.sources.kafkaSource.kafka.consumer.timeout.ms = 100 #------- kafkaChannel相關配置------------------------- # channel型別 agent.channels.kafkaChannel.type = org.apache.flume.channel.kafka.KafkaChannel # channel儲存的事件容量,即佇列長度 agent.channels.kafkaChannel.capacity=10000 # 事務容量 agent.channels.kafkaChannel.transactionCapacity=1000 # kafka broker list agent.channels.kafkaChannel.brokerList=mysql1:9092,mysql4:9092 # 指定topic agent.channels.kafkaChannel.topic=flume-regex-channel # 指定zk地址 agent.channels.kafkaChannel.zookeeperConnect=10.45.9.139:2181 # 指定producer的選項,關鍵是指定acks的值,保證訊息傳送的可靠性,retries採用預設的3 # agent.channels.kafkaChannel.kafka.producer.acks=all #---------hbaseSink 相關配置------------------ # 指定sink型別 # agent.sinks.hbaseSink.type = asynchbase agent.sinks.hbaseSink.type = hbase # 指定hbase中的表名 agent.sinks.hbaseSink.table = student # 指明column family agent.sinks.hbaseSink.columnFamily = info # 使用的serializer # agent.sinks.hbaseSink.serializer=org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer # 如果需要使用正則處理value可以使用以下的serializer agent.sinks.hbaseSink.serializer= org.apache.flume.sink.hbase.RegexHbaseEventSerializer # 指定某一列來當主鍵,而不是用隨機生成的key # agent.sinks.hbaseSink.serializer.rowKeyIndex = 0 # 指定正則表示式,這裡用的正則是匹配逗號分隔的字串 agent.sinks.hbaseSink.serializer.regex=^([^,]+),([^,]+),([^,]+),([^,]+)$ # 指定在列族中對應的的colName agent.sinks.hbaseSink.serializer.colNames=c1,c2,c3,c4 # 指定hbase所用的zk集合 agent.sinks.hbaseSink.zookeeperQuorum = mysql3:2181,mysql4:2181,mysql5:2181
- 修改producer檔案,將value值傳送"one,two,three,four"可以匹配正則。執行結果如下圖:
PS:建議把lfume預設的JVM大小改大點,並且開啟JMX方便監控JVM
vi $FLUME_HOME/bin/flume-ng # set default params # 若干內容... JAVA_OPTS="-Xmx1500m -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false " LD_LIBRARY_PATH="" # 若干內容...
5. 效率測試
測試按照第四節的配置來進行。生產者程式碼如下:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Date; import java.util.Properties; /** * @author Wan Kaiming on 2016/8/1 * @version 1.0 */ public class MyProducer { public static void main(String[] args) { //統計時間 System.out.println("程式開始時間戳資訊:"+new Date()); final long startTime=System.currentTimeMillis(); Properties props = new Properties(); //broker地址 這裡用域名,記得修改本地的hosts檔案 props.put("bootstrap.servers", "mysql1:9092,mysql4:9092"); //訊息可靠性語義 props.put("acks", "all"); //請求broker失敗進行重試的次數,避免由於請求broker失敗造成的訊息重複 props.put("retries", 3); //按批發送,每批的訊息數量 //props.put("batch.size", 16384); props.put("batch.size", 16384); //防止來不及傳送,延遲一點點時間,使得能夠批量傳送訊息 props.put("linger.ms", 1); //緩衝大小,bytes props.put("buffer.memory", 33554432); //key的序列化類 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value的序列化類 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //建立一個Producer物件,載入配置上下文資訊 Producer<String, String> producer = new KafkaProducer<String,String>(props); for(int i=0;i<1000000;i++){ producer.send(new ProducerRecord<String, String>("my-topic-regex", Integer.toString(i), "one,two,three,four")); } producer.close(); final long endTime=System.currentTimeMillis(); float excTime=(float)(endTime-startTime)/1000; System.out.println("執行時間:"+excTime+"s"); System.out.println("當前時間為:"+ new Date()); } }
測試基本資訊:
名稱 | 資訊 |
---|---|
PC硬體資訊 | Intel(R) Xeon(R) CPU E7- 4830 @ 2.13GHz,記憶體4G |
JAVA | JDK1.8 |
KAFKA | 2臺broker(heap size =1200M),1個topic,5個分割槽,2個複製分割槽,acks=all |
flume | 1.60版本,heap size=1.5G,kafkachannel,kafka source,hbase sink。 |
負載資訊 | 100萬條訊息,每條訊息20byte的樣子 |
hbase | 1臺master,3臺slave,其中1臺slave和master在一臺機器,版本0.98-hadoop2 |
測試結果1(kafkachannel=0):
- kafka傳送訊息時間:6.912s
- hbase接受完全部訊息:4分33s
- 延遲時間:4分26s
測試結果2(kafkachannel=all):
- kafka傳送訊息時間:8.25s
- hbase接受完全部訊息:4分59s
- 延遲時間:4分51s
PS: 測試會存在一定誤差。因為讀取hbase的時候是按照1000條的批大小批量讀取的,count完整個HBASE的記錄本身也會花很多時間。也就是意味著,實際的延遲時間肯定比我測試的要小。測試1的時候,32.8萬條訊息,花費時間約為120s,得到吞吐量TPS=2733。該值基本比較準確。
綜上,在保證訊息可靠性前提下,kafka訊息通過flume寫hbase的吞吐量TPS基本在3K左右這個數量級。相信經過更多的配置優化、硬體效能提升、增大JVM堆等方式,提升TPS不是問題。