1. 程式人生 > >How-to: use spark to suport query across mysql tables and hbase tables

How-to: use spark to suport query across mysql tables and hbase tables

To resolve this, one good choice is spark whose parquet support and dataframe resolved this problem. Parquet is a good choice for performance consideration. Here is the steps:
  1. With sqlContext, the mysql big tables could be loaded and saved as parquet files in hdfs. Design this as daily job. The code could be like following:
    Please notcie that this code is based on spark-1.3. From Spark 1.4, please use sqlContext.read
    partitions
    = ( upper_bound - lower_bound ) / lines_each_part 
    varoptions: HashMap[String, String] =newHashMap options.put("driver","com.mysql.jdbc.Driver") options.put("url",url ) options.put("dbtable",table options.put("lowerBound",lower_bound.toString()) options.put("upperBound",upper_bound.toString()) //partitions are base don lower_bound and upper_bound
    options.put("numPartitions",partitions.toString()) options.put("partitionColumn",id); valjdbcDF=sqlContext.load("jdbc",options) jdbcDF.save(output)
  2. Load mysql data from hdfs parquet files like following:
    sqlContext.parquetFile(base_dir + "/" + table).toDF().registerTempTable(table)
  3. Load hbase table as DF and register as a table:
    varconfig = HBaseConfiguration.create() config.addResource( new Path(System.getenv("HBASE_HOME" ) + "/conf/hbase-site.xml" )) try {   HBaseAdmin.checkHBaseAvailable( config)   System. out.println( "Detected HBase is running" ) catch {  casee => e .printStackTrace } config.set(TableInputFormat. INPUT_TABLE hbase_table ) config.set(TableInputFormat. SCAN_COLUMN_FAMILY columnF )
    ......
    sqlContext.createDataFrame( hc .toRowRDD(hc .createPairRDD(jsc, config)),hc.schema()).toDF().registerTempTable(table)

    //hc.createPaireRDD:
    public JavaPairRDD<ImmutableBytesWritable, Result> createPairRDD (                 JavaSparkContext jsc, Configuration conf) {  returnjsc .newAPIHadoopRDD(  conf,                        TableInputFormat. class ,                        ImmutableBytesWritable. class ,                        Result. class                        ).cache(); }

    //hc.toRowRDD:
    public JavaRDD<Row> toRowRDD(JavaPairRDD<ImmutableBytesWritable, Result> pairRDD ) { returnpairRDD.map(new Function<Tuple2<ImmutableBytesWritable, Result>, Row>() {  privatestatic finallongserialVersionUID = -4887770613787757684L;  public Row call(Tuple2<ImmutableBytesWritable, Result> re)  throws Exception {                    Result result = re._2();                    Row row = null ;  if (schema .getColumns(). length == 0) {  row = getAll( result);                    } else {  row = get( result);                    }  returnrow ;              }  public Row get(Result result throws Exception {                    List<Object> values = new ArrayList<Object>();  for (String col schema .getColumns()) {  byte [] b result .getValue(schema .getFamily().getBytes(), col.getBytes());  if (b == null) {  values.add( "0" );  continue ;                           }  values.add( new String(b ));                    }                    Row row = RowFactory. create( values.toArray( new Object[values .size()]));  returnrow ;               } public Row getAll(Result result throws Exception {                    NavigableMap< byte [], byte []> map result .getFamilyMap(schema .getFamily().getBytes());                    List<Object> values = new ArrayList<Object>();  for (byte [] key map .keySet()) {  values.add( new String(map .get(key )));                    }                    Row row = RowFactory. create( values.toArray( new Object[values .size()]));  returnrow ;              }       }); } //hc.schema(): 
    public StructType schema() {  final List<StructField> keyFields new ArrayList<StructField>();  for (String fieldName this.hbase_columns) {//hbase_columns is String[]  keyFields .add(DataTypes.createStructField( fieldName , DataTypes.StringType true));        }  return DataTypes.createStructType( keyFields );        }
  4. run sql as following and save result in hdfs:
    valrdd_parquet=sqlContext.sql(sql) rdd_parquet.rdd.saveAsTextFile(output)