spark通過phoenix讀寫hbase(Java版)
阿新 • • 發佈:2019-01-23
pom.xml
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-core --> <dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-core</artifactId> <version>4.13.1-HBase-1.2</version> </dependency> <dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-spark</artifactId> <version>4.13.1-HBase-1.2</version> </dependency> <!-- https://mvnrepository.com/artifact/joda-time/joda-time --> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.9.9</version> </dependency>
呼叫示例
public class Test { /** * phoenix jdbc config */ private static final String DB_PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver"; private static final String DB_PHOENIX_URL = "jdbc:phoenix:hadoop101,hadoop102,hadoop103"; private static final String DB_PHOENIX_USER = ""; private static final String DB_PHOENIX_PASS = ""; private static final String DB_PHOENIX_FETCHSIZE = "10000"; /** * 載入資料查詢SQL */ private static final String SQL_QUERY = "(SELECT date,member_id FROM events WHERE time>='%s' AND time<'%s' AND event='login') events"; /** * 任務名稱 */ private static final String APP_NAME = "Test"; public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName(APP_NAME) .setMaster("local[1]") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses( new Class[]{} ); SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate(); DateTime start = new DateTime(args[0]), end = new DateTime(args[1]); String sql = String.format(SQL_QUERY, start.toString("yyyy-MM-dd"), end.toString("yyyy-MM-dd")); // 正常去重後拼接字串儲存與後邊測試binary做對比 sinkDataByVarchar(sparkSession, sql); sparkSession.stop(); } /** * 普通方式去重並存儲 * * @param sparkSession * @param query * @return */ private static void sinkDataByVarchar(SparkSession sparkSession, String query) { try { // JDBC連線屬性 Properties connProp = new Properties(); connProp.put("driver", DB_PHOENIX_DRIVER); connProp.put("user", DB_PHOENIX_USER); connProp.put("password", DB_PHOENIX_PASS); connProp.put("fetchsize", DB_PHOENIX_FETCHSIZE); JavaRDD<Row> rows = sparkSession .read() .jdbc(DB_PHOENIX_URL, query, connProp) .filter("member_id != -1") .javaRDD() .mapToPair(r -> new Tuple2<>( r.getString(0) , r.getLong(1) )) .distinct() .groupByKey() .map(r -> { StringBuffer sb = new StringBuffer(); r._2.forEach(v -> { sb.append(v); }); return RowFactory.create(r._1, sb.toString()); }); // schema List<StructField> fields = new ArrayList<>(); fields.add(DataTypes.createStructField("date", DataTypes.StringType, false)); fields.add(DataTypes.createStructField("dist_mem", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); // 寫入 String insertTable = "test_string"; sparkSession .createDataFrame(rows, schema) .write() .format("org.apache.phoenix.spark") .mode(SaveMode.Overwrite) .option("table", insertTable) .option("zkUrl", DB_PHOENIX_URL) .save(); } catch (Exception e) { e.printStackTrace(); } } }
呼叫:
public class App {
@Test
public void testJob() {
String[] args = new String[]{"2017-06-01", "2017-07-01"};
Test.main(args);
}
}