1. 程式人生 > >Hive擴充套件功能(七)--Hive On Spark

Hive擴充套件功能(七)--Hive On Spark

軟體環境:

linux系統: CentOS6.7
Hadoop版本: 2.6.5
zookeeper版本: 3.4.8

主機配置:

一共m1, m2, m3這五部機, 每部主機的使用者名稱都為centos
192.168.179.201: m1 
192.168.179.202: m2 
192.168.179.203: m3 

m1: Zookeeper, Namenode, DataNode, ResourceManager, NodeManager, Master, Worker
m2: Zookeeper, Namenode, DataNode, ResourceManager, NodeManager, Worker
m3: Zookeeper, DataNode, NodeManager, Worker

參考資料:

spark原始碼編譯教程
    http://blog.csdn.net/yanran1991326/article/details/46506595
Hive on Spark搭建教程以及遇到的坑指南
    http://www.cnblogs.com/breg/p/5552342.html(可作為遇到坑時檢視使用)
    http://www.cnblogs.com/linbingdong/p/5806329.html(建議參照該教程)
官方參考配置檔案
    https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-Spark
Hive On Spark官網資料: https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started 關於Hive on Spark的討論和進度(官方): https://issues.apache.org/jira/browse/HIVE-7292

說明:

要使用Hive on Spark,所用的Spark版本必須不包含Hive的相關jar包,Hive官網上指出:Note that you must have a version of Spark which does not include the Hive jars.


在spark官網下載的編譯的Spark都是有整合Hive的,因此需要自己下載原始碼來編譯,並且編譯的時候不指定Hive

一.安裝Maven: (Linux下, 若使用Spark原始碼包自帶的編譯工具,則可跳過此步)

參考資料

Maven教程:
    http://wiki.jikexueyuan.com/project/maven/
  1. 下載Maven安裝包

  2. 解壓Maven到指定位置

  3. 編輯/etc/profile檔案

vi /etc/profile
export M2_HOME=/home/centos/soft/maven
PATH=$PATH:$M2_HOME/bin
source /etc/profile
  1. 檢驗是否安裝成功
mvn -v
  1. 設定maven記憶體大小
Linux下:
vi /etc/profile
export MAVEN_OPTS=-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m
Windows下:
新建變數: MAVEN_OPTS
將變數MAVEN_OPTS的值設定成:  -Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m

二.編譯spark原始碼:

參考資料:

spark原始碼下載官方地址:
    http://spark.apache.org/downloads.html
spark原始碼編譯官方指南:
    http://spark.apache.org/docs/1.5.0/building-spark.html
spark原始碼編譯教程:
    http://blog.csdn.net/yanran1991326/article/details/46506595
Hive On Spark遇坑指南:
    http://www.cnblogs.com/linbingdong/p/5806329.html

1. 編譯指令:

(切記不可用-Phive引數; 推薦使用第一種方案, 因為第二種方案親測編譯成功後各種缺包報錯)
方案一:maven編譯: (Linux下,推薦使用這種方法)
如果想生成一個用scala2.1.2編譯的spark 部署包,則要先執行change-scala-version.sh檔案,執行指令如下:

./dev/change-scala-version.sh 2.10  (若要指定scala的編譯版本時, 必須先執行該指令)
mvn -Phadoop-2.6 -Pyarn -Dhadoop.version=2.6.5 -Dyarn.version=2.6.5 -Dscala-2.10 -DskipTests clean package

指令引數使用介紹:

–Phadoop-$系列:             打包時所用的Hadoop系列號,不加此引數時hadoop為pom.xml的預設系列。 
-Dhadoop.version=$版本號    打包時所用的Hadoop版本號,不加此引數時不可從HDFS上讀取資料。 
–Pyarn                      是否支援Hadoop YARN,不加引數時為不支援yarn。 
-Dyarn.version=$版本號        是否支援Hadoop YARN,不加引數時為不支援yarn排程。 
–Phive                      是否在Spark SQL中支援hive,不加此引數時為不支援hive。(若要使用Hive on Spark功能時, 不能新增次引數)
-Dscala-$版本號              打包時所用的Scala系列號,不加此引數時Scala版本為pom.xml的預設版本, 在使用此函式之前必須先執行./dev/change-scala-version.sh 2.10指令,否則無效    
-DskipTests                是否在編譯的過程中略過測試,加此引數時為略過。


方案二:使用spark原始碼包中自帶的make-distribution編譯工具
編譯Spark原始碼(若需要用到parquet功能,則帶上parquet-provided引數)
(1)Spark2.0版本之前(hadoop版本可隨實際情況修改)

./make-distribution.sh --name "hadoop2-without-hive" --tgz "-Pyarn,hadoop-provided,hadoop-2.4,parquet-provided"

(2)Spark2.0版本之後(hadoop版本可隨實際情況修改)

./dev/make-distribution.sh --name "hadoop2-without-hive" --tgz "-Pyarn,hadoop-provided,hadoop-2.7,parquet-provided"

2. 編譯完成

方案一:使用Maven方式編譯:
編譯成功後的Spark引用包的存放位置:

$Spark原始碼目錄/assembly/target/scala-2.10/spark-assembly-1.6.3-hadoop2.6.5.jar

該包的只是一個資源包, 應把tgz解壓安裝的$SPARK_HOME/lib目錄下的assembly刪除, 然後將該包放入到$SPARK_HOME/lib目錄下

方案二:使用make-distribution.sh方式編譯:
編譯成功後的Spark安裝包的存放位置:

$Spark原始碼目錄/spark-1.6.0-bin-hadoop2-without-hive-src.tgz

該包是一個安裝包, 用tar解壓出安裝即可, 不推薦使用

3.更新說明

更新之前有個BUG, 就是在此之前編譯的Spark部署包並不能操作Hive中Paruqet, 個人猜測的原因是: 因為我之前推薦的是Maven來編譯Spark部署包, 但是在編譯的包裡並沒有支援ParquetJAR包,所以當操作Parquet表時就會報錯, 在此進行BUG修復, 若不使用parquet儲存格式, 則可繼續用以前的方式編譯,並無大礙
想要使用Parquet儲存方式, 本人目前掌握的方法是:
使用spark原始碼包中自帶的make-distribution編譯工具, 編譯指令為:

./dev/make-distribution.sh --name "hadoop2-without-hive" --tgz "-Pyarn,hadoop-provided,hadoop-2.6,parquet-provided"

編譯建議:

在此假設你編譯成功, 如果編譯不成功除了對記憶體溢位或者報了很明顯的錯誤之外, 其他錯誤的處理辦法都是不斷的重試,重試,再重試,不斷的輸入編譯指令進行編譯,就能編譯成功了,編譯時間長短不定,建議編譯成功之後別把spark的原始碼目錄刪除, 因為只要沒刪除以後編譯相同版本的就會容易很多

在之前說過, 用make-distribution.sh編譯的Spark部署包是會報錯的, 那麼下面對報錯進行解決:
問題1:
啟動spark叢集時報錯,啟動Master節點,報錯資訊為:

Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/Logger
        at java.lang.Class.getDeclaredMethods0(Native Method)
        at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
        at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
        at java.lang.Class.getMethod0(Class.java:3018)
        at java.lang.Class.getMethod(Class.java:1784)
        at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
        at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.slf4j.Logger
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 7 more

解決方案:

/home/centos/soft/hadoop/share/hadoop/common/lib目錄下的slf4j-api-1.7.5.jar檔案,slf4j-log4j12-1.7.5.jar檔案和commons-logging-1.1.3.jar檔案拷貝到/home/centos/soft/spark/lib目錄下

問題2:
啟動spark叢集時報錯,啟動Master節點,報錯資訊為:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
        at java.lang.Class.getDeclaredMethods0(Native Method)
        at java.lang.Class.privateGetDeclaredMethods(Class.java:2570)
        at java.lang.Class.getMethod0(Class.java:2813)
        at java.lang.Class.getMethod(Class.java:1663)
        at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
        at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        ... 6 more

解決方案:

官網參考:
    https://spark.apache.org/docs/latest/hadoop-provided.html#apache-hadoop

編輯spark-env.sh檔案,新增下列該項:

vi  $SPARK_HOME/conf/spark-env.sh
export  SPARK_DIST_CLASSPATH=$(/home/centos/soft/hadoop/bin/hadoop classpath)

三.配置YARN

若Hive on Spark使用YARN作為排程器,則配置YARN,否則,跳過此步驟,不進行配置.

官方參考資料:
    https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html
    https://hadoop.apache.org/docs/r2.7.1/hadoop-yarn/hadoop-yarn-site/FairScheduler.html

四.配置Hive

1.導包到HIVE_HOME/lib目錄下

(1)在Hive-2.2.0版本之前

將$SPARK_HOME/lib目錄下assembly的jar包連結到HIVE_HOME/lib目錄下

(2)在Hive-2.2.0版本之後,
yarn模式:

將scala-library, spark-core, spark-network-common包連結到HIVE_HOME/lib目錄下

本地模式:

chill-java, chill, jackson-module-paranamer, jackson-module-scala, ersey-container-servlet-core, jersey-server, json4s-ast, kryo-shaded, minlog, scala-xml, spark-launcher, spark-network-shuffle, park-unsafe, xbean-asm5-shaded包連結到HIVE_HOME/lib目錄下

2.上傳JAR包到HDFS上

(1)在Hive-2.2.0版本之前,
$HIVE_HOME/lib目錄下assembly的包上傳到HDFS上,並在hive-site.xml檔案中配置assembly包位置:

<property>
    <name>spark.yarn.jar</name>
    <value>hdfs://ns1/spark-assembly.jar</value>
</property>

(2)在Hive-2.2.0版本之後,
將$SPARK_HOME/lib目錄下的所有包上傳到HDFS上,並在hive-site.xml檔案中配置以下配置項:

<property>
    <name>spark.yarn.jars</name>
    <value>hdfs://ns1/spark-jars/*</value>
</property>

3.綜上所述:

Hive的配置項為以下配置, 編輯hive-site.xml檔案 (以下配置是Spark2.2.0之前的YARN模式下的配置)

<!-- property>
    <name>spark.master</name>
    <value>yarn-cluster</value>
    <description> YARN Model </description>
</property -->
<property>
    <name>spark.master</name>
    <value>spark://m1:7077</value>
    <description> Standalone Model </description>
</property>
<property>
    <name>spark.home</name>             
    <value>/home/centos/soft/spark/</value>
</property>
<property>
    <name>hive.execution.engine</name>
    <value>spark</value>
</property>
<property>
    <name>spark.enentLog.enabled</name>
    <value>true</value>
</property>
<property>
    <name>spark.enentLog.dir</name>
    <value>/home/centos/soft/spark/logs/enentLogDir</value>
</property>
<property>
    <name>spark.serializer</name>                   
    <value>org.apache.spark.serializer.KryoSerializer</value>
</property>
<property>
  <name>spark.executor.extraJavaOptions</name>
  <value>-XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"</value>
</property>
<property>
    <name>spark.executor.cores</name>                       
    <value>1</value>
</property>
<property>
    <name>spark.executor.instances</name>               
    <value>1</value>
</property>
<property>
    <name>spark.executor.memory</name>                  
    <value>512m</value>
</property>
<property>
    <name>spark.driver.memory</name>
    <value>512m</value>
</property>
<property>
    <name>yarn.nodemanager.resource.memory-mb</name>
    <value>512</value>
</property>
<property>
    <name>spark.yarn.executor.memoryOverhead</name>         
    <value>75</value>
</property>
<property>
    <name>spark.yarn.driver.memoryOverhead</name>           
    <value>75</value>
</property>
<property>
  <name>spark.yarn.jar</name>
  <value>hdfs://ns1/Jar/spark-assembly-1.6.0-hadoop2.6.0.jar</value>
</property>
<property>
  <name>hive.spark.client.channel.log.level</name>
  <value>DEBUG</value>
</property>
<property>
  <name>hive.spark.client.rpc.server.address</name>
  <value>m1</value>
</property>
<property>
  <name>spark.driver.extraJavaOptions</name>
  <value>-XX:PermSize=128M -XX:MaxPermSize=512M</value>
</property>
<!-- 索引用到的配置項 -->
<property>
    <name>hive.optimize.index.filter</name>
    <value>true</value>
</property>
<property>
    <name>hive.optimize.index.groupby</name>
    <value>true</value>
</property>

三.配置Spark

舉例說明:

  1. 當在YARN模式下執行Spark時,我們通常建議將spark.executor.cores設定為5,6或7(在spark-default.conf中設定),這取決於典型節點可以被整除。例如,如果yarn.nodemanager.resource.cpu-vcores19,那麼6是一個更好的選擇(所有執行器只能有相同數量的核心,這裡如果我們選擇5,那麼每個執行器只有3個核心;如果我們選擇7,則只使用2個執行器,並且將浪費5個核心)。如果它是20,那麼5是一個更好的選擇(因為這樣你會得到4個執行者,沒有核心被浪費)。

  2. 對於spark.executor.memory,我們建議計算yarn.nodemanager.resource.memory-mb *(spark.executor.cores / yarn.nodemanager.resource.cpu-vcores),然後在spark.executor.memory和spark之間分割yarn.executor.memoryOverhead。根據我們的實驗,我們建議將spark.yarn.executor.memoryOverhead設定為總記憶體的15-20%

  3. 在確定了每個執行器接收到多少記憶體之後,您需要決定將多少個執行器分配給查詢。在GA發行Spark動態執行器分配將被支援。然而對於這個測試,只能使用靜態資源分配。基於每個節點中的實體記憶體和spark.executor.memoryspark.yarn.executor.memoryOverhead的配置,您將需要選擇例項數並設定spark.executor.instances

  4. 現在一個現實世界的例子。假設有10個節點,每個節點具有64GB的記憶體,具有12個虛擬核心,例如,yarn.nodemanager.resource.cpu-vcores = 12。一個節點將用作主節點,因此叢集將具有9個從節點。我們將spark.executor.cores配置為6.給定64GBram yarn.nodemanager.resource.memory-mb將是50GB。我們將確定每個執行程式的記憶體量,如下所示:50GB *(6/12)= 25GB。我們將20%分配給spark.yarn.executor.memoryOverhead5120,將80%分配給spark.executor.memory20GB

  5. 在這個9節點叢集上,每個主機有兩個執行器。因此,我們可以將spark.executor.instances配置為介於218之間的某個值。值18將利用整個叢集。

配置項解析:
spark.executor.cores                         Between 5-7, See tuning details section
spark.executor.memory                        yarn.nodemanager.resource.memory-mb * (spark.executor.cores / yarn.nodemanager.resource.cpu-vcores) 
spark.yarn.executor.memoryOverhead           15-20% of spark.executor.memory
spark.executor.instances                     Depends on spark.executor.memory + spark.yarn.executor.memoryOverhead, see tuning details section.
配置項範例:

在spark-default.conf檔案下配置以下配置項:

spark.executor.cores=1
spark.executor.memory=512m
spark.yarn.executor.memoryOverhead=75
spark.executor.instances=1

四.配置HPL/SQL

將HPL/SQL儲存過程呼叫的計算引擎改為Spark, 編輯hplsql-site.xml檔案, 修改下列配置項(因本人使用Hive On Spark, 所以在下列配置項中將mr改為spark, 根據實際情況而定)

<property>
  <name>hplsql.conn.init.hiveconn</name>
  <value>
     set mapred.job.queue.name=default;
     set hive.execution.engine=spark; 
     use default;
  </value>
  <description>Statements for execute after connection to the database</description>
</property>
<property>
  <name>hplsql.conn.init.hive2conn</name>
  <value>
     set mapred.job.queue.name=default;
     set hive.execution.engine=spark; 
     use default;
  </value>
  <description>Statements for execute after connection to the database</description>
</property>

五.測試

重啟metastore, spark-master, spark-slaves服務, 進入Hive測試

sh  $SPARK_HOME/bin/stop-master.sh
sh  $SPARK_HOME/bin/stop-slaves.sh
sh  $SPARK_HOME/bin/start-master.sh
sh  $SPARK_HOME/bin/start-slaves.sh
sh  $HIVE_HOME/bin/hive --service metastore
sh  $HIVE_HOME/bin/hive --service hiveserver2
sh  $HIVE_HOME/bin/hive 
select count(*) from bill_auth;

六.官方推薦配置項:

mapreduce.input.fileinputformat.split.maxsize=750000000
hive.vectorized.execution.enabled=true
hive.cbo.enable=true
hive.optimize.reducededuplication.min.reducer=4
hive.optimize.reducededuplication=true
hive.orc.splits.include.file.footer=false
hive.merge.mapfiles=true
hive.merge.sparkfiles=false
hive.merge.smallfiles.avgsize=16000000
hive.merge.size.per.task=256000000
hive.merge.orcfile.stripe.level=true
hive.auto.convert.join=true
hive.auto.convert.join.noconditionaltask=true
hive.auto.convert.join.noconditionaltask.size=894435328
hive.optimize.bucketmapjoin.sortedmerge=false
hive.map.aggr.hash.percentmemory=0.5
hive.map.aggr=true
hive.optimize.sort.dynamic.partition=false
hive.stats.autogather=true
hive.stats.fetch.column.stats=true
hive.vectorized.execution.reduce.enabled=false
hive.vectorized.groupby.checkinterval=4096
hive.vectorized.groupby.flush.percent=0.1
hive.compute.query.using.stats=true
hive.limit.pushdown.memory.usage=0.4
hive.optimize.index.filter=true
hive.exec.reducers.bytes.per.reducer=67108864
hive.smbjoin.cache.rows=10000
hive.exec.orc.default.stripe.size=67108864
hive.fetch.task.conversion=more
hive.fetch.task.conversion.threshold=1073741824
hive.fetch.task.aggr=false
mapreduce.input.fileinputformat.list-status.num-threads=5
spark.kryo.referenceTracking=false
spark.kryo.classesToRegister=org.apache.hadoop.hive.ql.io.HiveKey,org.apache.hadoop.io.BytesWritable,org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch