1. 程式人生 > 其它 >實現將Kafka Topic中的資料傳入HBase

實現將Kafka Topic中的資料傳入HBase

技術標籤:菜鳥也學大資料kafkaHBase大資料kafkahbase

準備前的操作

建立Maven專案

  • 在Pom.xml中新增依賴
	<!- 根據自己使用的kafka、HBase版本進行修改->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>2.0.0</version>
    </dependency
>
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</
artifactId
>
<version>1.2.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.0</version> </dependency> </dependencies>

實現程式碼(JAVA)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

/**
 * @ Author: Zhangyu
 * @ Date: 2021/1/6
 * @ Description:將kafka event_attendees中的資料消費到HBase的events_db:event_attendee中
 */
public class EventAttendTohb {
    public static void main(String[] args) {
        //配置Kafka連線資訊
        Properties prop=new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.**.**:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,30000);
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//不允許自動提交
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"eventAttend");
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        //建立kafka消費者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
        
        //設定消費的Topic
        consumer.subscribe(Collections.singleton("event_attendees_row"));//設定讀取的topic

        //配置HBase連線資訊
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.rootdir","hdfs://192.168.95.99:9000/hbase");
        conf.set("hbase.zookeeper.quorum","192.168.95.99");
        conf.set("hbase.zookeeper.property.clientPort","2181");

        try {
            //建立連線HBase例項
            Connection connection = ConnectionFactory.createConnection(conf);
            
            //將讀取的資料設定放入指定表中
            Table eventAttendTable = connection.getTable(TableName.valueOf("events_db:event_attendee"));
            
            //開始讀取Topic中的資料
            while (true) {
            
                //每100毫秒拉取一次資料
                ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
                
                //建立List,儲存Put型別的資料,需要方在while裡定義,否則集合會無限增大,最終會導致OOM
                List<Put> datas=new ArrayList<>();
                
                //將拉取的資料轉換成Put型別並add到集合中
                for (ConsumerRecord<String, String> record : poll) {
                
                    //輸出拉取的資料,防止在拉取資料的時候失敗
                    System.out.println(record.value());
                    
                    //對資料按","號分割
                    String[] split = record.value().split(",");
                    
                    //將拆分的資料放入Put物件中
                    Put put = new Put(Bytes.toBytes((split[0]+split[1]+split[2]).hashCode()));
                    
                    //為Put物件中的資料指定列簇與列名
                    put.addColumn("euat".getBytes(),"eventid".getBytes(),split[0].getBytes());
                    put.addColumn("euat".getBytes(),"userid".getBytes(),split[1].getBytes());
                    put.addColumn("euat".getBytes(),"state".getBytes(),split[2].getBytes());

					//將處理後的Put物件新增到集合中
                    datas.add(put);
                }
                
                //將一次拉取的資料put到HBase中
                eventAttendTable.put(datas);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }


    }
}