How-to: use spark to suport query across mysql tables and hbase tables
阿新 • • 發佈:2019-02-07
To resolve this, one good choice is spark whose parquet support and dataframe resolved this problem. Parquet is a good choice for performance consideration. Here is the steps:
- With sqlContext, the mysql big tables could be loaded and saved as parquet files in hdfs. Design this as daily job. The code could be like following:
Please notcie that this code is based on spark-1.3. From Spark 1.4, please use sqlContext.read
partitions
varoptions: HashMap[String, String] =newHashMap options.put("driver","com.mysql.jdbc.Driver") options.put("url",url ) options.put("dbtable",table) options.put("lowerBound",lower_bound.toString()) options.put("upperBound",upper_bound.toString()) //partitions are base don lower_bound and upper_bound - Load mysql data from hdfs parquet files like following:
sqlContext.parquetFile(base_dir + "/" + table).toDF().registerTempTable(table) - Load hbase table as DF and register as a table:
...... sqlContext.createDataFrame( hc .toRowRDD(hc .createPairRDD(jsc, config)),hc.schema()).toDF().registerTempTable(table)
//hc.createPaireRDD:
public JavaPairRDD<ImmutableBytesWritable, Result> createPairRDD ( JavaSparkContext jsc, Configuration conf) { returnjsc .newAPIHadoopRDD( conf, TableInputFormat. class , ImmutableBytesWritable. class , Result. class ).cache(); }
//hc.toRowRDD:
public JavaRDD<Row> toRowRDD(JavaPairRDD<ImmutableBytesWritable, Result> pairRDD ) { returnpairRDD.map(new Function<Tuple2<ImmutableBytesWritable, Result>, Row>() { privatestatic finallongserialVersionUID = -4887770613787757684L; public Row call(Tuple2<ImmutableBytesWritable, Result> re) throws Exception { Result result = re._2(); Row row = null ; if (schema .getColumns(). length == 0) { row = getAll( result); } else { row = get( result); } returnrow ; } public Row get(Result result ) throws Exception { List<Object> values = new ArrayList<Object>(); for (String col : schema .getColumns()) { byte [] b = result .getValue(schema .getFamily().getBytes(), col.getBytes()); if (b == null) { values.add( "0" ); continue ; } values.add( new String(b )); } Row row = RowFactory. create( values.toArray( new Object[values .size()])); returnrow ; } public Row getAll(Result result ) throws Exception { NavigableMap< byte [], byte []> map = result .getFamilyMap(schema .getFamily().getBytes()); List<Object> values = new ArrayList<Object>(); for (byte [] key : map .keySet()) { values.add( new String(map .get(key ))); } Row row = RowFactory. create( values.toArray( new Object[values .size()])); returnrow ; } }); } //hc.schema():
public StructType schema() { final List<StructField> keyFields = new ArrayList<StructField>(); for (String fieldName : this.hbase_columns) {//hbase_columns is String[] keyFields .add(DataTypes.createStructField( fieldName , DataTypes.StringType , true)); } return DataTypes.createStructType( keyFields ); } - run sql as following and save result in hdfs:
valrdd_parquet=sqlContext.sql(sql) rdd_parquet.rdd.saveAsTextFile(output)