1. 程式人生 > >04 hbase提取kafka中的資料儲存

04 hbase提取kafka中的資料儲存

    上一篇中的測試時是採用kafka消費者,如果把消費者換成hbase就可以實現hbase提取kafka中的資料進行儲存。

      啟動hbase要先啟動hdfs,hbase需要zk

      啟動hdfs:start-dfs.sh 

       啟動hbase:start-hbase.sh

        要hbase高可用,需要在其他節點中啟動:hbase-daemon.sh start master

        各節點程序:

        

建立hbase消費者:

        在idea中需要引入hbase-site.xml以及hdfs-site.xml 檔案 一樣配置檔案外部化:

        

kafka.properties:

zookeeper.connect=s128:2181,s129:2181,s130:2181
group.id=g4  //使用者組
zookeeper.session.timeout.ms=500
zookeeper.sync.time.ms=250
auto.commit.interval.ms=1000
auto.offset.reset=smallest
#主題
topic=calllog    //kafka中的topic
#表名
table.name=ns1:calllogs //hbase中資料表名
#分割槽數
partition.number=100
#主叫標記
caller.flag=0
#hash區域的模式
hashcode.pattern=00

建立HbaseDao類,訪問hbase,進行資料相關操作:

/**
 * Hbase資料訪問物件
 */
public class HbaseDao {
    //
    private DecimalFormat df = new DecimalFormat() ;

    private Table table = null ;

    private int partitions ;

    private String flag  ;
    public HbaseDao(){
        try {
            Configuration conf = HBaseConfiguration.create();
            Connection conn = ConnectionFactory.createConnection(conf);
            TableName name = TableName.valueOf(PropertiesUtil.getProp("table.name"));
            table = conn.getTable(name);

            df.applyPattern(PropertiesUtil.getProp("hashcode.pattern"));

            partitions = Integer.parseInt(PropertiesUtil.getProp("partition.number"));
            flag = PropertiesUtil.getProp("caller.flag") ;
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    /**
     * put資料到hbase
     */
    public void put(String log){
        if (log == null || log.equals("")) {
            return;
        }
        try {
            //解析日誌
            String[] arr = log.split(",");
            if (arr != null && arr.length == 4) {
                String caller = arr[0];
                String callee = arr[1];
                String callTime = arr[2];
                callTime = callTime.replace("/","") ;       //刪除/
                callTime = callTime.replace(" ","") ;       //刪除空格
                callTime = callTime.replace(":","") ;       //刪除空格

                String callDuration = arr[3];
                //結算區域號

                //構造put物件
                String rowkey = genRowkey(getHashcode(caller, callTime), caller, callTime, flag, callee, callDuration);
                //
                Put put = new Put(Bytes.toBytes(rowkey));
                put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("caller"), Bytes.toBytes(caller));
                put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callee"), Bytes.toBytes(callee));
                put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callTime"), Bytes.toBytes(callTime));
                put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callDuration"), Bytes.toBytes(callDuration));
                table.put(put);

            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public String getHashcode(String caller ,String callTime){
        int len = caller.length();
        //取出後四位電話號碼
        String last4Code = caller.substring(len - 4);
        //取出時間單位,年份和月份.
        String mon = callTime.substring(0,6);
        //
        int hashcode = (Integer.parseInt(mon) ^ Integer.parseInt(last4Code)) % partitions ;
        return df.format(hashcode);
    }

    /**
     * 生成rowkey
     * @param hash
     * @param caller
     * @param time
     * @param flag
     * @param callee
     * @param duration
     * @return
     */
    public String genRowkey(String hash,String caller,String time,String flag,String callee,String duration){
        return hash + "," + caller + "," + time + "," + flag + "," + callee + "," + duration ;
    }
}

建立HbaseConsumer(hbase消費者):

**
 * Hbase消費者,從kafka提取資料,儲存到hbase中。
 */
public class HbaseConsumer {

    public static void main(String[] args) throws Exception {
        HbaseDao dao = new HbaseDao();
        //建立配置物件
        ConsumerConfig config = new ConsumerConfig(PropertiesUtil.props);

        //獲得主題
        String topic = PropertiesUtil.getProp("topic");
        //
        Map<String, Integer> map = new HashMap<String, Integer>();
        map.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> msgs = Consumer.createJavaConsumerConnector(new ConsumerConfig(PropertiesUtil.props)).createMessageStreams(map);

        List<KafkaStream<byte[], byte[]>> msgList = msgs.get(topic);

        String msg = null ;
        for (KafkaStream<byte[], byte[]> stream : msgList) {
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                byte[] message = it.next().message();
                //取得kafka的訊息
                msg = new String(message) ;
                //寫入hbase中。
                dao.put(msg);
            }
        }
    }
}

打成jar包放到s128。

因為事先要到入很多相關包,所以在window下使用mvn命令,下載工件的所有依賴軟體包
----------------------------------------

mvn -DoutputDirectory=./lib -DgroupId=com.chenway -DartifactId=CallLogConsumerModule -Dversion=1.0-SNAPSHOT dependency:copy-dependencies -DgroupId=com.chenway -DartifactId=CallLogConsumerModule -Dversion=1.0-SNAPSHOT dependency:copy-dependencies

    將生成的所有jar包放入s128下lib資料夾

       編寫run-kafkaconsumer.sh指令碼:

    

執行生成資料以及hbase消費者指令碼:

        ./run-kafkaconsumer.sh

        ./calllog.sh

可以進入hbase shell

 檢視命令:scan ‘ns1:calllogs’