flume接收資料傳入hbase,並生成指定的rowkey和column
目的:flume從event中取出資料作為hbase的rowkey 使用flume接收資料,再傳入hbase中,要求中間資料不落地。 flume使用http source入口,使用sink連線hbase實現資料匯入,並且通過channels使flume的記憶體資料儲存到本地磁碟(防止叢集出現故障,資料可以備份至本地) 傳入資料格式為 http:10.0.0.1_{asdasd} 格式說明(url_資料)
hbase儲存的結果為: rowkey:當前時間_url value:資料 即要對傳入的資料進行切分,將url作為rowkey的一部分,當前時間作為另一部分,資料儲存到value中
步驟: 1.重寫flume中能指定rowkey的原始碼(HbaseEventSerializer介面)。再打成jar包 java原始碼見下面:
2.將製作jar包放入flume的/home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/lib目錄下 3.flume配置檔案
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = http a1.sources.r1.port = 44444 a1.sources.r1.bind = 10.0.0.183 # Describe the sink a1.sinks.k1.type = hbase a1.sinks.k1.channel = c1 a1.sinks.k1.table = httpdata a1.sinks.k1.columnFamily = a a1.sinks.k1.serializer = com.hbase.Rowkey a1.sinks.k1.channel = memoryChannel # Use a channel which buffers events in memory a1.channels.c1.type = file a1.channels.c1.checkpointDir = /home/mathartsys/oyzm_test/flu-hbase/checkpoint/ a1.channels.c1.useDualCheckpoints = false a1.channels.c1.dataDirs = /home/mathartsys/oyzm_test/flu-hbase/flumedir/ a1.channels.c1.maxFileSize = 2146435071 a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 10000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
4.在hbase中建表 create 'httpdata,‘a’
5.flume啟動命令 flume-ng agent -c . -f /mysoftware/flume-1.7.0/conf/hbase_simple.conf -n a1 -Dflume.root.logger=INFO,console
6.flume資料寫入命令 curl -X POST -d’[{“body”:“http:10.0.0.1_{asdasd}”}]’ http://10.0.0.183:44444
hbase中資料結果: 20181108104034_http:10.0.0.183 column=a:data, timestamp=1541644834926, value={asdasd}
java原始碼:
package com.hbase;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.HbaseEventSerializer;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
public class Rowkey implements HbaseEventSerializer {
//列族(不用管)
private byte[] colFam="cf".getBytes();
//獲取檔案
private Event currentEvent;
public void initialize(Event event, byte[] colFam) {
//byte[]位元組型陣列
this.currentEvent = event;
this.colFam = colFam;
}
public void configure(Context context) {}
public void configure(ComponentConfiguration conf) {
}
//指定rowkey,單元格修飾名,值
public List<Row> getActions() {
// 切分 currentEvent檔案 從中拿到的值
String eventStr = new String(currentEvent.getBody());
//body格式為:url_value
String url = eventStr.split("_")[0];
String data = eventStr.split("_")[1];
//得到系統日期
Date d = new Date();
SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmmss");
//rowkey
byte[] currentRowKey = (df.format(d)+"_"+url).getBytes();
//hbase的put操作
List<Row> puts = new ArrayList<Row>();
Put putReq = new Put(currentRowKey);
//putReq.addColumn 列族,單元格修飾名(可指定),值
//putReq={"totalColumns":0,"families":{},"row":"d934e9adf3c540c8b58af1077fe7a0a39223370594393854807/webapp"}
putReq.addColumn(colFam, "data".getBytes(), data.getBytes());
puts.add(putReq);
return puts;
}
public List<Increment> getIncrements() {
List<Increment> incs = new ArrayList<Increment>();
return incs;
}
//關閉流
public void close() {
colFam = null;
currentEvent = null;
}
}
pom檔案:
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-ng-hbase-sink</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
</dependencies>
</project>