1. 程式人生 > >Spark Kudu 結合

Spark Kudu 結合

ble strong mes let while select ntb 核心 fontsize

Kudu的背景

Hadoop中有很多組件,為了實現復雜的功能通常都是使用混合架構,

  • Hbase:實現快速插入和修改,對大量的小規模查詢也很迅速
  • HDFS/Parquet + Impala/Hive:對超大的數據集進行查詢分析,對於這類場景, Parquet這種列式存儲文件格式具有極大的優勢。
  • HDFS/Parquet + Hbase:這種混合架構需要每隔一段時間將數據從hbase導出成Parquet文件,然後用impala來實現復雜的查詢分析
    以上的架構沒辦法把復雜的實時查詢集成在Hbase上

技術分享

技術分享

Kudu的設計

  • Kudu是對HDFS和HBase功能上的補充,能提供快速的分析和實時計算能力,並且充分利用CPU和I/O資源,支持數據原地修改,支持簡單的、可擴展
    的數據模型。
  • Kudu的定位是提供”fast analytics on fast data”,kudu期望自己既能夠滿足分析的需求(快速的數據scan),也能夠滿足查詢的需求(快速的隨機訪問)。它定位OLAP和少量的OLTP工作流,如果有大量的random accesses,官方建議還是使用HBase最為合適

技術分享

技術分享

Kudu的結構

技術分享

其實跟Hbase是有點像的

Kudu的使用

1:支持主鍵(類似 關系型數據庫)
2:支持事務操作,可對數據增刪改查數據
3:支持各種數據類型
4:支持 alter table。可刪除列(非主鍵)
5:支持 INSERT, UPDATE, DELETE, UPSERT
6:支持Hash,Range分區
進入Impala-shell -i node1ip


具體的CURD語法可以查詢官方文檔,我就不一一列了
http://kudu.apache.org/docs/kudu_impala_integration.html
建表
Create table kudu_table (Id string,Namestring,Age int,
Primary key(id,name)
)partition by hash partitions 16
Stored as kudu;
插入數據
Insert into kudu_table
Select * from impala_table;
註意
以上的sql語句都是在impala裏面執行的。Kudu和hbase一樣都是nosql查詢的,Kudu本身只提供api。impala集成了kudu。
技術分享

Kudu Api

奉上我的Git地址:
https://github.com/LinMingQiang/spark-util/tree/spark-kudu

Scala Api

pom.xml

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-metastore</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-service</artifactId>
    <version>1.1.0</version>
    <exclusions>
        <exclusion>
            <artifactId>servlet-api</artifactId>
            <groupId>javax.servlet</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-client</artifactId>
    <version>1.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.6.0</version>
</dependency>
<dependency>
    <groupId>org.kududb</groupId>
    <artifactId>kudu-spark_2.10</artifactId>
    <version>1.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-mapreduce</artifactId>
    <version>1.3.1</version>
    <exclusions>
        <exclusion>
            <artifactId>jsp-api</artifactId>
            <groupId>javax.servlet.jsp</groupId>
        </exclusion>
        <exclusion>
            <artifactId>servlet-api</artifactId>
            <groupId>javax.servlet</groupId>
        </exclusion>
    </exclusions>
        val client = new KuduClientBuilder("master2").build()
    val table = client.openTable("impala::default.kudu_pc_log")
    client.getTablesList.getTablesList.foreach { println }
    val schema = table.getSchema();
    val kp = KuduPredicate.newComparisonPredicate(schema.getColumn("id"), KuduPredicate.ComparisonOp.EQUAL, "1")
    val scanner = client.newScanTokenBuilder(table)
      .addPredicate(kp)
      .limit(100)
      .build()
    val token = scanner.get(0)
    val scan = KuduScanToken.deserializeIntoScanner(token.serialize(), client)
    while (scan.hasMoreRows()) {
      val results = scan.nextRows()
      while (results.hasNext()) {
        val rowresult = results.next();
        println(rowresult.getString("id"))
      }
    }

Spark Kudu Api

val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("Test"))
  val sparksql = new SQLContext(sc)
  import sparksql.implicits._
  val a = new KuduContext(kuduMaster, sc)
def getKuduRDD() {
    val tableName = "impala::default.kudu_pc_log"
    val columnProjection = Seq("id", "name")
    val kp = KuduPredicate.newComparisonPredicate(new ColumnSchemaBuilder("id", Type.STRING).build(), KuduPredicate.ComparisonOp.EQUAL, "q")
    val df = a.kuduRDD(sc, tableName, columnProjection,Array(kp))
    df.foreach { x => println(x.mkString(",")) }
  }
 def writetoKudu() {
    val tableName = "impala::default.student"
    val rdd = sc.parallelize(Array("k", "b", "a")).map { n => STU(n.hashCode, n) }
    val data = rdd.toDF()
    a.insertRows(data, tableName)
  }
  case class STU(id: Int, name: String)

小結

    • Kudu簡單來說就是加強版的Hbase,除了像hbase一樣可以高效的單條數據查詢,他的表結構是類型關系型數據庫的。集合impala可以達到復雜sql的實時查詢。適合做OLAP(官方也是這麽定位的)
    • Kudu本質上是將性能的優化,寄托在以列式存儲為核心的基礎上,希望通過提高存儲效率,加快字段投影過濾效率,降低查詢時CPU開銷等來提升性能。而其他絕大多數設計,都是為了解決在列式存儲的基礎上支持隨機讀寫這樣一個目的而存在的。比如類Sql的元數據結構,是提高列式存儲效率的一個輔助手段,唯一主鍵的設定也是配合列式存儲引入的定制策略,至於其他如Delta存儲,compaction策略等都是在這個設定下為了支持隨機讀寫,降低latency不確定性等引入的一些Tradeoff方案。
      官方測試結果上,如果是存粹的隨機讀寫,或者單行的檢索請求這類場景,由於這些Tradeoff的存在,HBASE的性能吞吐率是要優於Kudu不少的(2倍到4倍),kudu的優勢還是在支持類SQL檢索這樣經常需要進行投影操作的批量順序檢索分析場合。目前kudu還處在Incubator階段,並且還沒有成熟的線上應用(小米走在了前面,做了一些業務應用的嘗試),在數據安全,備份,系統健壯性等方面也還要打個問號,所以是否使用kudu,什麽場合,什麽時間點使用,是個需要好好考量的問題 ;)

Spark Kudu 結合