04 hbase提取kafka中的資料儲存
上一篇中的測試時是採用kafka消費者,如果把消費者換成hbase就可以實現hbase提取kafka中的資料進行儲存。
啟動hbase要先啟動hdfs,hbase需要zk
啟動hdfs:start-dfs.sh
啟動hbase:start-hbase.sh
要hbase高可用,需要在其他節點中啟動:hbase-daemon.sh start master
各節點程序:
建立hbase消費者:
在idea中需要引入hbase-site.xml以及hdfs-site.xml 檔案 一樣配置檔案外部化:
kafka.properties:
zookeeper.connect=s128:2181,s129:2181,s130:2181 group.id=g4 //使用者組 zookeeper.session.timeout.ms=500 zookeeper.sync.time.ms=250 auto.commit.interval.ms=1000 auto.offset.reset=smallest #主題 topic=calllog //kafka中的topic #表名 table.name=ns1:calllogs //hbase中資料表名 #分割槽數 partition.number=100 #主叫標記 caller.flag=0 #hash區域的模式 hashcode.pattern=00
建立HbaseDao類,訪問hbase,進行資料相關操作:
/** * Hbase資料訪問物件 */ public class HbaseDao { // private DecimalFormat df = new DecimalFormat() ; private Table table = null ; private int partitions ; private String flag ; public HbaseDao(){ try { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName name = TableName.valueOf(PropertiesUtil.getProp("table.name")); table = conn.getTable(name); df.applyPattern(PropertiesUtil.getProp("hashcode.pattern")); partitions = Integer.parseInt(PropertiesUtil.getProp("partition.number")); flag = PropertiesUtil.getProp("caller.flag") ; } catch (Exception e) { e.printStackTrace(); } } /** * put資料到hbase */ public void put(String log){ if (log == null || log.equals("")) { return; } try { //解析日誌 String[] arr = log.split(","); if (arr != null && arr.length == 4) { String caller = arr[0]; String callee = arr[1]; String callTime = arr[2]; callTime = callTime.replace("/","") ; //刪除/ callTime = callTime.replace(" ","") ; //刪除空格 callTime = callTime.replace(":","") ; //刪除空格 String callDuration = arr[3]; //結算區域號 //構造put物件 String rowkey = genRowkey(getHashcode(caller, callTime), caller, callTime, flag, callee, callDuration); // Put put = new Put(Bytes.toBytes(rowkey)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("caller"), Bytes.toBytes(caller)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callee"), Bytes.toBytes(callee)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callTime"), Bytes.toBytes(callTime)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callDuration"), Bytes.toBytes(callDuration)); table.put(put); } } catch (Exception e) { e.printStackTrace(); } } public String getHashcode(String caller ,String callTime){ int len = caller.length(); //取出後四位電話號碼 String last4Code = caller.substring(len - 4); //取出時間單位,年份和月份. String mon = callTime.substring(0,6); // int hashcode = (Integer.parseInt(mon) ^ Integer.parseInt(last4Code)) % partitions ; return df.format(hashcode); } /** * 生成rowkey * @param hash * @param caller * @param time * @param flag * @param callee * @param duration * @return */ public String genRowkey(String hash,String caller,String time,String flag,String callee,String duration){ return hash + "," + caller + "," + time + "," + flag + "," + callee + "," + duration ; } }
建立HbaseConsumer(hbase消費者):
**
* Hbase消費者,從kafka提取資料,儲存到hbase中。
*/
public class HbaseConsumer {
public static void main(String[] args) throws Exception {
HbaseDao dao = new HbaseDao();
//建立配置物件
ConsumerConfig config = new ConsumerConfig(PropertiesUtil.props);
//獲得主題
String topic = PropertiesUtil.getProp("topic");
//
Map<String, Integer> map = new HashMap<String, Integer>();
map.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> msgs = Consumer.createJavaConsumerConnector(new ConsumerConfig(PropertiesUtil.props)).createMessageStreams(map);
List<KafkaStream<byte[], byte[]>> msgList = msgs.get(topic);
String msg = null ;
for (KafkaStream<byte[], byte[]> stream : msgList) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
byte[] message = it.next().message();
//取得kafka的訊息
msg = new String(message) ;
//寫入hbase中。
dao.put(msg);
}
}
}
}
打成jar包放到s128。
因為事先要到入很多相關包,所以在window下使用mvn命令,下載工件的所有依賴軟體包
----------------------------------------
mvn -DoutputDirectory=./lib -DgroupId=com.chenway -DartifactId=CallLogConsumerModule -Dversion=1.0-SNAPSHOT dependency:copy-dependencies -DgroupId=com.chenway -DartifactId=CallLogConsumerModule -Dversion=1.0-SNAPSHOT dependency:copy-dependencies
將生成的所有jar包放入s128下lib資料夾
編寫run-kafkaconsumer.sh指令碼:
執行生成資料以及hbase消費者指令碼:
./run-kafkaconsumer.sh
./calllog.sh
可以進入hbase shell
檢視命令:scan ‘ns1:calllogs’