hive 常用知識點
hm3
hive建立orc格式表不能像textfile格式一樣直接load資料到表中,一般需要建立臨時textfile表,然後通過insert into 或者insert overwrite到orc儲存格式表中。
臨時表
create table if not exists hm3.hm3_format_log_tmp ( time string, source string, remote_addr string, remote_user string, body_bytes_sent string, request_time string, status string, host string, request_method string, http_referrer string, http_x_forwarded_for string, http_user_agent string, upstream_response_time string, upstream_addr string, ngx_timestamp string, get_type string, data string ) partitioned by (dt string, hour string, msgtype string, action string) row format delimited fields terminated by '\t';
hm3資料表
create external table if not exists hm3.hm3_format_log ( time string, source string, remote_addr string, remote_user string, body_bytes_sent string, request_time string, status string, host string, request_method string, http_referrer string, http_x_forwarded_for string, http_user_agent string, upstream_response_time string, upstream_addr string, ngx_timestamp string, get_type string, data string ) partitioned by (dt string, hour string, msgtype string, action string) row format delimited fields terminated by '\t' stored as orc;
匯入資料
(1) 匯入資料到textfile
load data inpath 'hdfs://path' into table hm3_format_log_tmp partition(dt="2018-06-22",hour="00",msgtype="web", action="click");
(2)查詢資料插入orc格式表
insert into hm3.hm3_format_log partition(dt="2018-06-22",hour="00",msgtype="web", action="click") select time, source, remote_addr, remote_user, body_bytes_sent, request_time, status, host, request_method, http_referrer, http_x_forwarded_for, http_user_agent, upstream_response_time, upstream_addr, ngx_timestamp,get_type, data from hm3.hm3_format_log_tmp where dt = "2018-06-22" and hour = "00" and msgtype = "web" and action = "click";
表新增欄位
alter table hm2.helper add columns(country string, province string, city string);
hive新增欄位後,前面資料會有空值,就算將前面資料hdfs檔案刪除,重新匯入,仍然查詢出來是 NULL,這個問題有待解決。
- 解決
- 未解決
刪除欄位
CREATE TABLE test (
creatingTs BIGINT,
a STRING,
b BIGINT,
c STRING,
d STRING,
e BIGINT,
f BIGINT
);
如果需要刪除 column f 列,可以使用以下語句:
ALTER TABLE test REPLACE COLUMNS (
creatingTs BIGINT,
a STRING,
b BIGINT,
c STRING,
d STRING,
e BIGINT
);
增加列:
alter table of_test columns (judegment int)
UDF
add jar /home/hadoop/codejar/flash_format.jar;
create temporary function gamelabel as 'com.js.dataclean.hive.udf.hm2.GameLabel';
檢視函式用法:
查month 相關的函式
show functions like '*month*'
檢視 add_months 函式的用法
desc function add_months;
檢視 add_months 函式的詳細說明並舉例
desc function extended add_months;
UDAF
關於UDAF開發注意點:
1.需要import org.apache.hadoop.hive.ql.exec.UDAF以及org.apache.hadoop.hive.ql.exec.UDAFEvaluator,這兩個包都是必須的
2.函式類需要繼承UDAF類,內部類Evaluator實現UDAFEvaluator介面
3.Evaluator需要實現 init、iterate、terminatePartial、merge、terminate這幾個函式
1)init函式類似於建構函式,用於UDAF的初始化
2)iterate接收傳入的引數,並進行內部的輪轉。其返回型別為boolean
3)terminatePartial無引數,其為iterate函式輪轉結束後,返回亂轉資料,iterate和terminatePartial類似於hadoop的Combiner
4)merge接收terminatePartial的返回結果,進行資料merge操作,其返回型別為boolean
5)terminate返回最終的聚集函式結果
hive hbase 關聯
create 'flash_people','info','label'
create external table if not exists hm2.flash_people(
guid string comment "people guid",
firsttime string comment "首次入庫時間",
ip string comment "使用者ip",
jstimestamp bigint comment "json時間戳"
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties("hbase.columns.mapping" = ":key,info:firsttime,info:ip,:timestamp")
tblproperties("hbase.table.name" = "hm2:flash_people");
hive -e 'set mapred.reduce.tasks = 30;insert into hm2.flash_people select guid,dt,remote_addr,(32523145869-ngx_timestamp) from hm2.data where dt = "2018-07-01" and length(guid) = 38 and ngx_timestamp is not null and ngx_timestamp != '' and ngx_timestamp is regexp '\\d{8}' and remote_addr is not null and remote_addr != '';'
set mapred.reduce.tasks = 30;insert into hm2.flash_people select guid,dt,remote_addr,(32523145869-ngx_timestamp) from hm2.data where dt = "2018-07-01" and length(guid) = 38 and ngx_timestamp is not null and ngx_timestamp != '' and ngx_timestamp rlike'^\\d+$' and remote_addr is not null and remote_addr != '';
hive一些優化引數
set hive.auto.convert.join = false;
set hive.ignore.mapjoin.hint=false;
set hive.exec.parallel=true;
set mapred.reduce.tasks = 60;
新增欄位
alter table hm2.helper add columns(country string, province string, city string);
hive新增欄位後,前面資料會有空值,就算將前面資料hdfs檔案刪除,重新匯入,仍然查詢出來是 NULL,這個問題有待解決。
- 解決
- 未解決
刪除欄位
CREATE TABLE test (
creatingTs BIGINT,
a STRING,
b BIGINT,
c STRING,
d STRING,
e BIGINT,
f BIGINT
);
如果需要刪除 column f 列,可以使用以下語句:
ALTER TABLE test REPLACE COLUMNS (
creatingTs BIGINT,
a STRING,
b BIGINT,
c STRING,
d STRING,
e BIGINT
);
增加列:
alter table of_test columns (judegment int)
hive 支援insert,update,delete的配置
hive-site.xml中新增配置
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>1</value>
</property>
<property>
<name>hive.enforce.bucketing</name>
<value>true</value>
</property>
建表語句
create external table if not exists hm2.history_helper
(
guid string,
starttime string,
endtime string,
num int
)
clustered by(guid) into 50 buckets
stored as orc TBLPROPERTIES ('transactional'='true');
說明:建表語句必須帶有into buckets
子句和stored as orc TBLPROPERTIES ('transactional'='true')
子句,並且不能帶有sorted by
子句。
這樣,這個表就可以就行insert,update,delete操作了。
使用者表
flash_user
hbase
create 'hm2:flash_user','info','label'
hive
create external table if not exists hm2.flash_user(
guid string comment "使用者的 guid",
starttime string comment "首次入庫時間",
endtime string comment "最後一次訪問時間",
num int comment "訪問天數",
country string,
province string,
city string
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties("hbase.columns.mapping" = ":key,info:starttime,info:endtime,info:num,info:country, info:province, info:cify")
tblproperties("hbase.table.name" = "hm2:flash_user");
insert into hm2.flash_user select guid, starttime, endtime, num from hm2.history_helper where length(guid) == 38;
hive表中的鎖
場景:
在執行insert into或insert overwrite任務時,中途手動將程式停掉,會出現卡死情況(無法提交MapReduce),只能執行查詢操作,而drop insert操作均不可操作,無論執行多久,都會保持卡死狀態
臨時解決辦法是……把表名換一個……
根本原因是:hive表被鎖或者某個分割槽被鎖,需要解鎖
show locks 表名:
可以查看錶被鎖的情況
解鎖
unlock table 表名; -- 解鎖表
unlock table 表名 partition(dt='2014-04-01'); -- 解鎖某個分割槽
注意
表鎖和分割槽鎖是兩個不同的鎖,對錶解鎖,對分割槽是無效的,分割槽需要單獨解鎖
高版本hive預設插入資料時,不能查詢,因為有鎖
hive> show locks;
OK
[email protected] EXCLUSIVE
解決辦法:關閉鎖機制
set hive.support.concurrency=false; 預設為true
基本知識
查看錶結構資訊
desc formatted table_name;
desc table_name;
data game label 表
create external table if not exists hm2.game_label
(
guid string,
label string
)
partitioned by (dt string)
row format delimited fields terminated by '\t';
匯入資料到hive表
load命令
hive -e 'load data inpath "/data/MROutput/hm2_data_gamelabel_output2/part-r-*" into table hm2.game_label partition(dt="2018-10-11");'
注意:inpath 後接的hdfs路徑需要引號
python中的hive命令字串示例:
cmd = 'hive -e \'load data inpath "%s/part-r-*" into table hm2.game_label partition(dt=%s);\''%(outpath, formatDate(day))
orc格式表
hive建立orc格式表不能像textfile格式一樣直接load資料到表中,一般需要load建立臨時textfile表,然後通過insert into 或者insert overwrite到orc儲存格式表中。
map,reduce知識
- 什麼情況下只有一個reduce?
很多時候你會發現任務中不管資料量多大,不管你有沒有設定調整reduce個數的引數,任務中一直都只有一個reduce任務;其實只有一個reduce任務的情況,除了資料量小於hive.exec.reducers.bytes.per.reducer
引數值的情況外,還有以下原因:
a) 沒有group by的彙總,比如把select pt,count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt; 寫成 select count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04';
這點非常常見,希望大家儘量改寫。
b) 用了Order by
c) 有笛卡爾積
通常這些情況下,除了找辦法來變通和避免,我暫時沒有什麼好的辦法,因為這些操作都是全域性的,所以hadoop不得不用一個reduce去完成;
同樣的,在設定reduce個數的時候也需要考慮這兩個原則:使大資料量利用合適的reduce數;使單個reduce任務處理合適的資料量。
hive 優化
- hive mapreduce引數優化
設定map,reduce任務分配的資源
set mapreduce.map.memory.mb = 4096 ;
set mapreduce.reduce.memory.mb = 4096 ;
- hive.exec.parallel引數控制在同一個sql中的不同的job是否可以同時執行,預設為false.
下面是對於該引數的測試過程:
測試sql:
select r1.a
from
(select t.a from sunwg_10 t join sunwg_10000000 s on t.a=s.b) r1
join
(select s.b from sunwg_100000 t join sunwg_10 s on t.a=s.b) r2
on (r1.a=r2.b);
- 當引數為false的時候,三個job是順序的執行
set hive.exec.parallel=false;
- 但是可以看出來其實兩個子查詢中的sql並無關係,可以並行的跑
set hive.exec.parallel=true;
總結:
在資源充足的時候hive.exec.parallel會讓那些存在併發job的sql執行得更快,但同時消耗更多的資源
可以評估下hive.exec.parallel對我們的重新整理任務是否有幫助.
- 引數設定
set mapred.max.split.size=256000000; -- 決定每個map處理的最大的檔案大小,單位為B
hive on spark 知識
cdh 6.0.1 下通過設定:
set hive.execution.engine=spark;
可以將預設的mapreduce執行引擎切換為spark;
apache hadoop 下配置 hive on spark
引數調優
瞭解完了Spark作業執行的基本原理之後,對資源相關的引數就容易理解了。所謂的Spark資源引數調優,其實主要就是對Spark執行過程中各個使用資源的地方,通過調節各種引數,來優化資源使用的效率,從而提升Spark作業的執行效能。以下引數就是Spark中主要的資源引數,每個引數都對應著作業執行原理中的某個部分。
num-executors/spark.executor.instances
- 引數說明:該引數用於設定Spark作業總共要用多少個Executor程序來執行。Driver在向YARN叢集管理器申請資源時,YARN叢集管理器會盡可能按照你的設定來在叢集的各個工作節點上,啟動相應數量的Executor程序。這個引數非常之重要,如果不設定的話,預設只會給你啟動少量的Executor程序,此時你的Spark作業的執行速度是非常慢的。
- 引數調優建議:每個Spark作業的執行一般設定50~100個左右的Executor程序比較合適,設定太少或太多的Executor程序都不好。設定的太少,無法充分利用叢集資源;設定的太多的話,大部分佇列可能無法給予充分的資源。
executor-memory/spark.executor.memory
- 引數說明:該引數用於設定每個Executor程序的記憶體。Executor記憶體的大小,很多時候直接決定了Spark作業的效能,而且跟常見的JVM OOM異常,也有直接的關聯。
- 引數調優建議:每個Executor程序的記憶體設定4G8G較為合適。但是這只是一個參考值,具體的設定還是得根據不同部門的資源佇列來定。可以看看自己團隊的資源佇列的最大記憶體限制是多少,num-executors乘以executor-memory,是不能超過佇列的最大記憶體量的。此外,如果你是跟團隊裡其他人共享這個資源佇列,那麼申請的記憶體量最好不要超過資源佇列最大總記憶體的1/31/2,避免你自己的Spark作業佔用了佇列所有的資源,導致別的同學的作業無法執行。
executor-cores/spark.executor.cores
- 引數說明:該引數用於設定每個Executor程序的CPU core數量。這個引數決定了每個Executor程序並行執行task執行緒的能力。因為每個CPU core同一時間只能執行一個task執行緒,因此每個Executor程序的CPU core數量越多,越能夠快速地執行完分配給自己的所有task執行緒。
- 引數調優建議:Executor的CPU core數量設定為2~4個較為合適。同樣得根據不同部門的資源佇列來定,可以看看自己的資源佇列的最大CPU core限制是多少,再依據設定的Executor數量,來決定每個Executor程序可以分配到幾個CPU core。同樣建議,如果是跟他人共享這個佇列,那麼num-executors * executor-cores不要超過佇列總CPU core的1/3~1/2左右比較合適,也是避免影響其他同學的作業執行。
driver-memory
- 引數說明:該引數用於設定Driver程序的記憶體。
- 引數調優建議:Driver的記憶體通常來說不設定,或者設定1G左右應該就夠了。唯一需要注意的一點是,如果需要使用collect運算元將RDD的資料全部拉取到Driver上進行處理,那麼必須確保Driver的記憶體足夠大,否則會出現OOM記憶體溢位的問題。
spark.default.parallelism
- 引數說明:該引數用於設定每個stage的預設task數量。這個引數極為重要,如果不設定可能會直接影響你的Spark作業效能。
- 引數調優建議:Spark作業的預設task數量為500~1000個較為合適。很多同學常犯的一個錯誤就是不去設定這個引數,那麼此時就會導致Spark自己根據底層HDFS的block數量來設定task的數量,預設是一個HDFS block對應一個task。通常來說,Spark預設設定的數量是偏少的(比如就幾十個task),如果task數量偏少的話,就會導致你前面設定好的Executor的引數都前功盡棄。試想一下,無論你的Executor程序有多少個,記憶體和CPU有多大,但是task只有1個或者10個,那麼90%的Executor程序可能根本就沒有task執行,也就是白白浪費了資源!因此Spark官網建議的設定原則是,設定該引數為num-executors * executor-cores的2~3倍較為合適,比如Executor的總CPU core數量為300個,那麼設定1000個task是可以的,此時可以充分地利用Spark叢集的資源。
spark.storage.memoryFraction
- 引數說明:該引數用於設定RDD持久化資料在Executor記憶體中能佔的比例,預設是0.6。也就是說,預設Executor 60%的記憶體,可以用來儲存持久化的RDD資料。根據你選擇的不同的持久化策略,如果記憶體不夠時,可能資料就不會持久化,或者資料會寫入磁碟。
- 引數調優建議:如果Spark作業中,有較多的RDD持久化操作,該引數的值可以適當提高一些,保證持久化的資料能夠容納在記憶體中。避免記憶體不夠快取所有的資料,導致資料只能寫入磁碟中,降低了效能。但是如果Spark作業中的shuffle類操作比較多,而持久化操作比較少,那麼這個引數的值適當降低一些比較合適。此外,如果發現作業由於頻繁的gc導致執行緩慢(通過spark web ui可以觀察到作業的gc耗時),意味著task執行使用者程式碼的記憶體不夠用,那麼同樣建議調低這個引數的值。
spark.shuffle.memoryFraction
- 引數說明:該引數用於設定shuffle過程中一個task拉取到上個stage的task的輸出後,進行聚合操作時能夠使用的Executor記憶體的比例,預設是0.2。也就是說,Executor預設只有20%的記憶體用來進行該操作。shuffle操作在進行聚合時,如果發現使用的記憶體超出了這個20%的限制,那麼多餘的資料就會溢寫到磁碟檔案中去,此時就會極大地降低效能。
- 引數調優建議:如果Spark作業中的RDD持久化操作較少,shuffle操作較多時,建議降低持久化操作的記憶體佔比,提高shuffle操作的記憶體佔比比例,避免shuffle過程中資料過多時記憶體不夠用,必須溢寫到磁碟上,降低了效能。此外,如果發現作業由於頻繁的gc導致執行緩慢,意味著task執行使用者程式碼的記憶體不夠用,那麼同樣建議調低這個引數的值。
Spark On Yarn執行中executor記憶體限制問題
解決Spark On Yarn執行中executor記憶體限制問題
叢集版本 Spark 2.2.0 + Hadoop 3.0-CDH6.0.1
hive on saprk , 設定:
hive> set hive.execution.engine=spark;
hive> set spark.executor.memory=31.5g;
hive> set spark.executor.cores=11;
hive> set spark.serializer=org.apache.spark.serializer.KryoSerializer;
提示記憶體不足
Failed to execute spark task, with exception 'org.apache.hadoop.hive.ql.metadata.HiveException(Failed to create Spark client for Spark session 50288c8b-96aa-44ad-9eea-3cb4abb1ae5b)'
FAILED: Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Failed to create Spark client for Spark session 50288c8b-96aa-44ad-9eea-3cb4abb1ae5b
解決方案,修改Yarn的配置檔案:
1、yarn.nodemanager.resource.memory-mb
容器記憶體
設定為 至少 : executor-memory(15g) + driver(512m)
的記憶體,如上例可配置為 16g
2、yarn.scheduler.maximum-allocation-mb
最大容器記憶體
設定為 至少 : executor-memory(15g) + driver(512m)
的記憶體,如上例可配置為 16g
第一個引數為NodeManager
的配置 ,第二個引數為 ResourceManager
的配置。
字串處理
字串連線:
concat(str, str2, str3,...) 字串連線
concat_ws(separator, str, str2, str3, ...) 將字串用separator作為間隔連線起來
字串擷取
substr(s, 0, 1) 擷取第一個字元
substr(s, -1) 擷取最後一個字元
hive 中 join
mapjoin的優化在於,在mapreduce task開始之前,建立一個local task,小表以hshtable的形式載入到記憶體,然後序列化到磁碟,把記憶體的hashtable壓縮為tar檔案。然後把檔案分發到 Hadoop Distributed Cache,然後傳輸給每一個mapper,mapper在本地反序列化檔案並載入進記憶體在做join
sql
select workflow,count(workflow) from (select guid, substr(workflow, -1) workflow from hm2.workflow_list) m right join hm2.helper helper on m.guid = helper.guid and helper.dt = "2018-10-21" group by workflow;
記憶體溢位解決辦法:
set hive.auto.convert.join = false;
set hive.ignore.mapjoin.hint=false;
set hive.exec.parallel=true;
Hive中Join的原理和機制
籠統的說,Hive中的Join可分為Common Join(Reduce階段完成join)和Map Join(Map階段完成join)。本文簡單介紹一下兩種join的原理和機制。
Hive Common Join
如果不指定MapJoin或者不符合MapJoin的條件,那麼Hive解析器會將Join操作轉換成Common Join,即:在Reduce階段完成join.
整個過程包含Map、Shuffle、Reduce階段。
- Map階段
讀取源表的資料,Map輸出時候以Join on條件中的列為key,如果Join有多個關聯鍵,則以這些關聯鍵的組合作為key;
Map輸出的value為join之後所關心的(select或者where中需要用到的)列;同時在value中還會包含表的Tag資訊,用於標明此value對應哪個表;
按照key進行排序
-
Shuffle階段
根據key的值進行hash,並將key/value按照hash值推送至不同的reduce中,這樣確保兩個表中相同的key位於同一個reduce中 -
Reduce階段
根據key的值完成join操作,期間通過Tag來識別不同表中的資料。
以下面的HQL為例,圖解其過程:
SELECT
a.id,a.dept,b.age
FROM a join b
ON (a.id = b.id);
Hive Map Join
MapJoin通常用於一個很小的表和一個大表進行join的場景,具體小表有多小,由引數hive.mapjoin.smalltable.filesize來決定,該引數表示小表的總大小,預設值為25000000位元組,即25M。
Hive0.7之前,需要使用hint提示 /*+ mapjoin(table) */才會執行MapJoin,否則執行Common Join,但在0.7版本之後,預設自動會轉換Map Join,由引數hive.auto.convert.join來控制,預設為true.
仍然以9.1中的HQL來說吧,假設a表為一張大表,b為小表,並且hive.auto.convert.join=true,那麼Hive在執行時候會自動轉化為MapJoin。
如圖中的流程,首先是Task A,它是一個Local Task(在客戶端本地執行的Task),負責掃描小表b的資料,將其轉換成一個HashTable的資料結構,並寫入本地的檔案中,之後將該檔案載入到DistributeCache中,該HashTable的資料結構可以抽象為:
key | value |
---|---|
1 | 26 |
2 | 34 |
- 接下來是Task B,該任務是一個沒有Reduce的MR,啟動MapTasks掃描大表a,在Map階段,根據a的每一條記錄去和DistributeCache中b表對應的HashTable關聯,並直接輸出結果。
- 由於MapJoin沒有Reduce,所以由Map直接輸出結果檔案,有多少個Map Task,就有多少個結果檔案。
轉義字元
hive> select split('a:1||b:2||c:3','\\|\\|') from hm2.test;
OK
["a:1","b:2","c:3"]
["a:1","b:2","c:3"]
["a:1","b:2","c:3"]
["a:1","b:2","c:3"]
insert table select from
insert into tbName select * from tbName2;
insert overwrite table tbName select * from tbName2;
insert overwrite例子
insert overwrite table hm2.helper partition(dt = '2018-06-22', hour = '09',msgtype = 'helper') select time,source,remote_addr,remote_user,body_bytes_sent,request_time,status,host,request_method,http_referrer,http_x_forwarded_for,http_user_agent,upstream_response_time,upstream_addr,guid,helperversion,osversion,ngx_timestamp,get_type,split(ip2area(http_x_forwarded_for,remote_addr), "\t")[0] country,split(ip2area(http_x_forwarded_for,remote_addr), "\t")[1] province,split(ip2area(http_x_forwarded_for,remote_addr), "\t")[2] city from hm2.helper where dt = '2018-06-22' and hour = '09' and msgtype = 'helper';
插入分割槽表,不用指定分割槽,可以自動識別
INSERT overwrite TABLE test.dis_helper PARTITION (dt,hour,msgtype) select `(num)?+.+` from (select *,row_number() over (partition by guid order by time asc) num from hm2.helper where dt ='2018-09-06'and hour between '00' and '23' and msgtype='helper') t where t.num=1;
這裡把資料去重,插入分割槽表test.dis_helper中,自動根據dt,hour,msgtype欄位的取值進入分割槽表,並且(num)?+.+
表示除了num
這個欄位。