1. 程式人生 > 其它 >Flink1.12.1通過Table API / Flink SQL讀取HBase2.4.0

Flink1.12.1通過Table API / Flink SQL讀取HBase2.4.0

昨天群裡有人問 Flink 1.12 讀取Hbase的問題,於是看到這篇文章分享給大家。本文作者Ashiamd。

1. 環境

廢話不多說,這裡用到的環境如下(不確定是否都必要,但是至少我是這個環境)

  • zookeeper 3.6.2
  • Hbase 2.4.0
  • Flink 1.12.1

2. HBase表

# 建立表
create 'u_m_01' , 'u_m_r'

# 插入資料
put 'u_m_01', 'a,A', 'u_m_r:r' , '1'
put 'u_m_01', 'a,B', 'u_m_r:r' , '3'
put 'u_m_01', 'b,B', 'u_m_r:r' , '3'
put 'u_m_01', 'b,C', 'u_m_r:r' , '4'
put 'u_m_01', 'c,A', 'u_m_r:r' , '2'
put 'u_m_01', 'c,C', 'u_m_r:r' , '5'
put 'u_m_01', 'c,D', 'u_m_r:r' , '1'
put 'u_m_01', 'd,B', 'u_m_r:r' , '5'
put 'u_m_01', 'd,D', 'u_m_r:r' , '2'
put 'u_m_01', 'e,A', 'u_m_r:r' , '3'
put 'u_m_01', 'e,B', 'u_m_r:r' , '2'
put 'u_m_01', 'f,A', 'u_m_r:r' , '1'
put 'u_m_01', 'f,B', 'u_m_r:r' , '2'
put 'u_m_01', 'f,D', 'u_m_r:r' , '3'
put 'u_m_01', 'g,C', 'u_m_r:r' , '1'
put 'u_m_01', 'g,D', 'u_m_r:r' , '4'
put 'u_m_01', 'h,A', 'u_m_r:r' , '1'
put 'u_m_01', 'h,B', 'u_m_r:r' , '2'
put 'u_m_01', 'h,C', 'u_m_r:r' , '4'
put 'u_m_01', 'h,D', 'u_m_r:r' , '5'

3. pom依賴

  • jdk1.8
  • Flink1.12.1 使用的pom依賴如下(有些是多餘的)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-hive-hbase</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.12.1</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hive.version>3.1.2</hive.version>
        <mysql.version>8.0.19</mysql.version>
        <hbase.version>2.4.0</hbase.version>
    </properties>


    <dependencies>

        <!-- Flink -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- HBase -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hbase-2.2_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>


        <!--        &lt;!&ndash; JDBC &ndash;&gt;-->
<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>-->
<!--            <version>${flink.version}</version>-->
<!--        </dependency>-->

<!--        &lt;!&ndash; mysql &ndash;&gt;-->
<!--        <dependency>-->
<!--            <groupId>mysql</groupId>-->
<!--            <artifactId>mysql-connector-java</artifactId>-->
<!--            <version>${mysql.version}</version>-->
<!--        </dependency>-->

<!--        &lt;!&ndash; Hive Dependency &ndash;&gt;-->
<!--        <dependency>-->
<!--            <groupId>org.apache.hive</groupId>-->
<!--            <artifactId>hive-exec</artifactId>-->
<!--            <version>${hive.version}</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!-- <groupId>org.apache.flink</groupId>--> <!-- <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>--> <!-- <version>${flink.version}</version>--> <!-- </dependency>--> <
!-- Table API --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- csv --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <!-- &lt;!&ndash; Lombok &ndash;&gt;--> <!-- <dependency>--> <!-- <groupId>org.projectlombok</groupId>--> <!-- <artifactId>lombok</artifactId>--> <!-- <version>1.18.18</version>--> <!-- </dependency>--> </dependencies> </project>

用到的pojo類

package entity;
import java.io.Serializable;

public class UserMovie implements Serializable {
    @Override
    public String toString() {
        return "UserMovie{" +
                "userId='" + userId + '\'' +
                ", movieId='" + movieId + '\'' +
                ", ratting=" + ratting +
                '}';
    }

    public static long getSerialVersionUID() {
        return serialVersionUID;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getMovieId() {
        return movieId;
    }

    public void setMovieId(String movieId) {
        this.movieId = movieId;
    }

    public Double getRatting() {
        return ratting;
    }

    public void setRatting(Double ratting) {
        this.ratting = ratting;
    }

    public UserMovie() {
    }

    public UserMovie(String userId, String movieId, Double ratting) {
        this.userId = userId;
        this.movieId = movieId;
        this.ratting = ratting;
    }

    private static final long serialVersionUID = 256158274329337559L;

    private String userId;

    private String movieId;

    private Double ratting;

}

實際測試程式碼

package hbase;

import com.nimbusds.jose.util.IntegerUtils;
import entity.UserMovie;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.StringUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

public class HBaseTest_01   {
    public static void main(String[] args) throws Exception {
        // 批執行環境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 表環境
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        // 建立使用者-電影表 u_m
        TableResult tableResult = tableEnv.executeSql(
                "CREATE TABLE u_m (" +
                        " rowkey STRING," +
                        " u_m_r ROW<r STRING>," +
                        " PRIMARY KEY (rowkey) NOT ENFORCED" +
                        " ) WITH (" +
                        " 'connector' = 'hbase-2.2' ," +
                        " 'table-name' = 'default:u_m_01' ," +
                        " 'zookeeper.quorum' = '127.0.0.1:2181'" +
                        " )");

        // 查詢是否能獲取到HBase裡的資料
//        Table table = tableEnv.sqlQuery("SELECT rowkey, u_m_r FROM u_m");

        // 相當於 scan
        Table table = tableEnv.sqlQuery("SELECT * FROM u_m");

        // 查詢的結果
        TableResult executeResult = table.execute();

        // 獲取查詢結果
        CloseableIterator<Row> collect = executeResult.collect();

        // 輸出 (執行print或者下面的 Consumer之後,資料就被消費了。兩個只能留下一個)
        executeResult.print();

        List<UserMovie> userMovieList = new ArrayList<>();

        collect.forEachRemaining(new Consumer<Row>() {
            @Override
            public void accept(Row row) {
                String field0 = String.valueOf(row.getField(0));
                String[] user_movie = field0.split(",");
                Double ratting = Double.valueOf(String.valueOf(row.getField(1)));
                userMovieList.add(new UserMovie(user_movie[0],user_movie[1],ratting));
            }
        });


        System.out.println("................");

        for(UserMovie um : userMovieList){
            System.out.println(um);
        }
    }
}

5. 輸出

  1. 沒有註解掉第59行程式碼executeResult.print();時
+--------------------------------+--------------------------------+
|                         rowkey |                          u_m_r |
+--------------------------------+--------------------------------+
|                            a,A |                              1 |
|                            a,B |                              3 |
|                            b,B |                              3 |
|                            b,C |                              4 |
|                            c,A |                              2 |
|                            c,C |                              5 |
|                            c,D |                              1 |
|                            d,B |                              5 |
|                            d,D |                              2 |
|                            e,A |                              3 |
|                            e,B |                              2 |
|                            f,A |                              1 |
|                            f,B |                              2 |
|                            f,D |                              3 |
|                            g,C |                              1 |
|                            g,D |                              4 |
|                            h,A |                              1 |
|                            h,B |                              2 |
|                            h,C |                              4 |
|                            h,D |                              5 |
+--------------------------------+--------------------------------+
20 rows in set
................
  1. 註解掉第59行程式碼executeResult.print();時
................
UserMovie{userId='a', movieId='A', ratting=1.0}
UserMovie{userId='a', movieId='B', ratting=3.0}
UserMovie{userId='b', movieId='B', ratting=3.0}
UserMovie{userId='b', movieId='C', ratting=4.0}
UserMovie{userId='c', movieId='A', ratting=2.0}
UserMovie{userId='c', movieId='C', ratting=5.0}
UserMovie{userId='c', movieId='D', ratting=1.0}
UserMovie{userId='d', movieId='B', ratting=5.0}
UserMovie{userId='d', movieId='D', ratting=2.0}
UserMovie{userId='e', movieId='A', ratting=3.0}
UserMovie{userId='e', movieId='B', ratting=2.0}
UserMovie{userId='f', movieId='A', ratting=1.0}
UserMovie{userId='f', movieId='B', ratting=2.0}
UserMovie{userId='f', movieId='D', ratting=3.0}
UserMovie{userId='g', movieId='C', ratting=1.0}
UserMovie{userId='g', movieId='D', ratting=4.0}
UserMovie{userId='h', movieId='A', ratting=1.0}
UserMovie{userId='h', movieId='B', ratting=2.0}
UserMovie{userId='h', movieId='C', ratting=4.0}
UserMovie{userId='h', movieId='D', ratting=5.0}

注意

這裡我們在Flink在SQL裡面定義HBase的Table時,指定的欄位都是用的STRING型別,雖然本來應該是INT,但是用INT的時候,報錯了,改成INT就ok了。

博主教你手擼JVM 開源專案
https://github.com/anons-org/nada
https://gitee.com/grateful/nada
博主長期對外收徒,歡迎諮詢。
《程式語言設計和實現》《MUD遊戲開發》《軟體破解和加密》《遊戲輔助外掛》《JAVA開發》 以上課程非誠勿擾!



=================================
QQ:184377367
GOLang Q1群:6848027
GOLang Q2群:450509103
GOLang Q3群:436173132
GOLang Q4群:141984758
GOLang Q5群:215535604
Hadoop/mongodb(搭建/開發/運維)Q群481975850
C/C++/QT群 1414577
微控制器嵌入式/電子電路入門群群 306312845
MUD/LIB/巫師交流群 391486684
java/springboot/hadoop/JVM 群 4915800
Electron/koa/Nodejs/express 214737701
大前端群vue/js/ts 165150391
作業系統研發群:15375777
彙編/輔助/破解新手群:755783453
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段宣告,且在文章頁面明顯位置給出原文連線,否則保留追究法律責任的權利。