Spark2.0操作Hbase
讀寫Hbase的方法,這裡是通過Spark的RDD來操作的方法,通過Hbase API的方式是另一種,這裡不涉及。
首先配置pom,新增hbase依賴,一般Spark平臺不包含hbase的jar包,所以這些依賴不新增<scope>provided</scope>
maven相關部分如下:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<hbase.version>0.98.8-hadoop2</hbase.version>
<spark.artifactId.version>2.11</spark.artifactId.version>
</properties>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.2</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
<exclusion>
<artifactId>curator-framework</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>curator-recipes</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>curator-client</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>0.98.8-hadoop2</version>
<!-- <scope>provided</scope> -->
</dependency>
然後看操作程式碼如下:
package mytest
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
import org.apache.hadoop.hbase.util.Base64
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
object hbaseSpark4 {
val logger = LoggerFactory.getLogger(hbsaeSpark4.getClass)
def main(args: Array[String]): Unit = {
val tableName: String = "table1"
val writeTableName: String = "table2"
val pixFix: String = SevsConfig.getProp("hbase.table.family")
val sparkconf = new SparkConf().setAppName("hbsaeSpark4")
.set("HADOOP_USER_NAME", ("hbase.hadoop.username"))
.set("HADOOP_GROUP_NAME", ("hbase.hadoop.groupname"))
//.setMaster("local")
val sc = new SparkContext(sparkconf)
read_hbase(sc, tableName)
logger.error("write to hbase #################")
save_hase(sc, writeTableName)
logger.error("ending ################")
}
def read_hbase(sc: SparkContext, tableName: String): Unit = {
val configuration = HBaseConfiguration.create()
configuration.set("hbase.zookeeper.property.clientPort", "2015")
configuration.set("zookeeper.znode.parent", "/hbase")
configuration.set("hbase.zookeeper.quorum", "namenode1-sit.com,namenode2-sit.com,slave01-sit.com")
configuration.set(TableInputFormat.INPUT_TABLE, tableName)
val startRowkey = "0600000003"
val endRowkey = "8800000008"
val scan = new Scan(Bytes.toBytes(startRowkey), Bytes.toBytes(endRowkey))
// scan.setCacheBlocks(false)
// scan.addFamily(Bytes.toBytes("ks"));
// scan.addColumn(Bytes.toBytes("ks"), Bytes.toBytes("data"))
var proto = ProtobufUtil.toScan(scan)
val scan_str = Base64.encodeBytes(proto.toByteArray())
configuration.set(TableInputFormat.SCAN, scan_str)
val rdd = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val resultRDD = rdd.map(tup => resultToList(tup))
resultRDD.take(2).foreach(println)
}
def resultToList(tableValues: Tuple2[ImmutableBytesWritable, Result]): String = {
val tup = tableValues._2
val result = tableValues._2
val rowKey = result.getRow()
val rowId = Bytes.toString(rowKey).reverse
rowId
}
def save_hase(sc: SparkContext, writeTableName: String): Unit = {
val configuration = HBaseConfiguration.create()
configuration.set("hbase.zookeeper.property.clientPort", "2015")
configuration.set("zookeeper.znode.parent", "/hbase")
configuration.set("hbase.zookeeper.quorum", "namenode1-sit.com,namenode2-sit.com,slave01-sit.com")
configuration.set(TableOutputFormat.OUTPUT_TABLE, writeTableName)
val job = new Job(configuration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val indataRDD = sc.makeRDD(Array("1,jack,15", "2,Lily,16", "3,mike,16"))
val rdd = indataRDD.map(_.split(',')).map { arr =>
{
val put = new Put(Bytes.toBytes(arr(0)))
put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(arr(1)))
put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(arr(2).toInt))
(new ImmutableBytesWritable, put)
}
}
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
}
}