實現將Kafka Topic中的資料傳入HBase
阿新 • • 發佈:2021-01-08
技術標籤:菜鳥也學大資料kafkaHBase大資料kafkahbase
準備前的操作
- 主機對映:點選這裡
建立Maven專案
- 在Pom.xml中新增依賴
<!- 根據自己使用的kafka、HBase版本進行修改->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.0.0</version>
</dependency >
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</ artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>
實現程式碼(JAVA)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
/**
* @ Author: Zhangyu
* @ Date: 2021/1/6
* @ Description:將kafka event_attendees中的資料消費到HBase的events_db:event_attendee中
*/
public class EventAttendTohb {
public static void main(String[] args) {
//配置Kafka連線資訊
Properties prop=new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.**.**:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,30000);
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//不允許自動提交
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"eventAttend");
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//建立kafka消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
//設定消費的Topic
consumer.subscribe(Collections.singleton("event_attendees_row"));//設定讀取的topic
//配置HBase連線資訊
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir","hdfs://192.168.95.99:9000/hbase");
conf.set("hbase.zookeeper.quorum","192.168.95.99");
conf.set("hbase.zookeeper.property.clientPort","2181");
try {
//建立連線HBase例項
Connection connection = ConnectionFactory.createConnection(conf);
//將讀取的資料設定放入指定表中
Table eventAttendTable = connection.getTable(TableName.valueOf("events_db:event_attendee"));
//開始讀取Topic中的資料
while (true) {
//每100毫秒拉取一次資料
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
//建立List,儲存Put型別的資料,需要方在while裡定義,否則集合會無限增大,最終會導致OOM
List<Put> datas=new ArrayList<>();
//將拉取的資料轉換成Put型別並add到集合中
for (ConsumerRecord<String, String> record : poll) {
//輸出拉取的資料,防止在拉取資料的時候失敗
System.out.println(record.value());
//對資料按","號分割
String[] split = record.value().split(",");
//將拆分的資料放入Put物件中
Put put = new Put(Bytes.toBytes((split[0]+split[1]+split[2]).hashCode()));
//為Put物件中的資料指定列簇與列名
put.addColumn("euat".getBytes(),"eventid".getBytes(),split[0].getBytes());
put.addColumn("euat".getBytes(),"userid".getBytes(),split[1].getBytes());
put.addColumn("euat".getBytes(),"state".getBytes(),split[2].getBytes());
//將處理後的Put物件新增到集合中
datas.add(put);
}
//將一次拉取的資料put到HBase中
eventAttendTable.put(datas);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}