1. 程式人生 > >spark通過phoenix讀寫hbase(Java版)

spark通過phoenix讀寫hbase(Java版)

pom.xml

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.2.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-core -->
    <dependency>
        <groupId>org.apache.phoenix</groupId>
        <artifactId>phoenix-core</artifactId>
        <version>4.13.1-HBase-1.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.phoenix</groupId>
        <artifactId>phoenix-spark</artifactId>
        <version>4.13.1-HBase-1.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
    <dependency>
        <groupId>joda-time</groupId>
        <artifactId>joda-time</artifactId>
        <version>2.9.9</version>
    </dependency>

呼叫示例

public class Test {    
    /**
     * phoenix jdbc config
     */
    private static final String DB_PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
    private static final String DB_PHOENIX_URL = "jdbc:phoenix:hadoop101,hadoop102,hadoop103";
    private static final String DB_PHOENIX_USER = "";
    private static final String DB_PHOENIX_PASS = "";
    private static final String DB_PHOENIX_FETCHSIZE = "10000";
    /**
     * 載入資料查詢SQL
     */
    private static final String SQL_QUERY = "(SELECT date,member_id FROM events WHERE time>='%s' AND time<'%s' AND event='login') events";
    /**
     * 任務名稱
     */
    private static final String APP_NAME = "Test";

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName(APP_NAME)
                .setMaster("local[1]")
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .registerKryoClasses(
                        new Class[]{}
                );
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
        DateTime start = new DateTime(args[0]), end = new DateTime(args[1]);
        String sql = String.format(SQL_QUERY, start.toString("yyyy-MM-dd"), end.toString("yyyy-MM-dd"));

        // 正常去重後拼接字串儲存與後邊測試binary做對比
        sinkDataByVarchar(sparkSession, sql);

        sparkSession.stop();
    }
    /**
     * 普通方式去重並存儲
     *
     * @param sparkSession
     * @param query
     * @return
     */
    private static void sinkDataByVarchar(SparkSession sparkSession, String query) {
        try {
            // JDBC連線屬性
            Properties connProp = new Properties();
            connProp.put("driver", DB_PHOENIX_DRIVER);
            connProp.put("user", DB_PHOENIX_USER);
            connProp.put("password", DB_PHOENIX_PASS);
            connProp.put("fetchsize", DB_PHOENIX_FETCHSIZE);
            JavaRDD<Row> rows = sparkSession
                    .read()
                    .jdbc(DB_PHOENIX_URL, query, connProp)
                    .filter("member_id != -1")
                    .javaRDD()
                    .mapToPair(r -> new Tuple2<>(
                            r.getString(0)
                            , r.getLong(1)
                    ))
                    .distinct()
                    .groupByKey()
                    .map(r -> {
                        StringBuffer sb = new StringBuffer();
                        r._2.forEach(v -> {
                            sb.append(v);
                        });

                        return RowFactory.create(r._1, sb.toString());
                    });
            // schema
            List<StructField> fields = new ArrayList<>();
            fields.add(DataTypes.createStructField("date", DataTypes.StringType, false));
            fields.add(DataTypes.createStructField("dist_mem", DataTypes.StringType, true));
            StructType schema = DataTypes.createStructType(fields);
            // 寫入
            String insertTable = "test_string";
            sparkSession
                    .createDataFrame(rows, schema)
                    .write()
                    .format("org.apache.phoenix.spark")
                    .mode(SaveMode.Overwrite)
                    .option("table", insertTable)
                    .option("zkUrl", DB_PHOENIX_URL)
                    .save();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

呼叫:

public class App {
    @Test
    public void testJob() {
        String[] args = new String[]{"2017-06-01", "2017-07-01"};
        Test.main(args);
    }
}