1. 程式人生 > >hive 常用知識點

hive 常用知識點

hm3

hive建立orc格式表不能像textfile格式一樣直接load資料到表中,一般需要建立臨時textfile表,然後通過insert into 或者insert overwrite到orc儲存格式表中。

臨時表

create table if not exists hm3.hm3_format_log_tmp
(
time string,
source string,
remote_addr string,
remote_user string,
body_bytes_sent string,
request_time string,
status string,
host string,
request_method string,
http_referrer string,
http_x_forwarded_for string,
http_user_agent string,
upstream_response_time string,
upstream_addr string,
ngx_timestamp string,
get_type string,
data string
)
partitioned by (dt string, hour string, msgtype string, action string)
row format delimited fields terminated by '\t';

hm3資料表

create external table if not exists hm3.hm3_format_log
(
time string,
source string,
remote_addr string,
remote_user string,
body_bytes_sent string,
request_time string,
status string,
host string,
request_method string,
http_referrer string,
http_x_forwarded_for string,
http_user_agent string,
upstream_response_time string,
upstream_addr string,
ngx_timestamp string,
get_type string,
data string
)
partitioned by (dt string, hour string, msgtype string, action string)
row format delimited fields terminated by '\t'
stored as orc;

匯入資料

(1) 匯入資料到textfile

load data inpath 'hdfs://path' into table hm3_format_log_tmp partition(dt="2018-06-22",hour="00",msgtype="web", action="click");

(2)查詢資料插入orc格式表

insert into hm3.hm3_format_log partition(dt="2018-06-22",hour="00",msgtype="web", action="click") select time,
source,
remote_addr,
remote_user,
body_bytes_sent,
request_time,
status,
host,
request_method,
http_referrer,
http_x_forwarded_for,
http_user_agent,
upstream_response_time,
upstream_addr,
ngx_timestamp,get_type,
data 
from hm3.hm3_format_log_tmp where dt = "2018-06-22" and hour = "00" 
and msgtype = "web" and action = "click";

表新增欄位

alter table hm2.helper add columns(country string, province string, city string);

hive新增欄位後,前面資料會有空值,就算將前面資料hdfs檔案刪除,重新匯入,仍然查詢出來是 NULL,這個問題有待解決。

  • 解決
  • 未解決

刪除欄位

CREATE TABLE test (
creatingTs BIGINT,
a STRING,
b BIGINT,
c STRING,
d STRING,
e BIGINT,
f BIGINT
);

如果需要刪除 column f 列,可以使用以下語句:

ALTER TABLE test REPLACE COLUMNS (
creatingTs BIGINT,
a STRING,
b BIGINT,
c STRING,
d STRING,
e BIGINT
);

增加列:

alter table of_test columns (judegment int)

UDF

add jar /home/hadoop/codejar/flash_format.jar;
create temporary function gamelabel as 'com.js.dataclean.hive.udf.hm2.GameLabel';

檢視函式用法

查month 相關的函式

show functions like '*month*'

檢視 add_months 函式的用法

desc function add_months;

檢視 add_months 函式的詳細說明並舉例

desc function extended add_months;

UDAF

關於UDAF開發注意點:

1.需要import org.apache.hadoop.hive.ql.exec.UDAF以及org.apache.hadoop.hive.ql.exec.UDAFEvaluator,這兩個包都是必須的

2.函式類需要繼承UDAF類,內部類Evaluator實現UDAFEvaluator介面

3.Evaluator需要實現 init、iterate、terminatePartial、merge、terminate這幾個函式

1)init函式類似於建構函式,用於UDAF的初始化

2)iterate接收傳入的引數,並進行內部的輪轉。其返回型別為boolean

3)terminatePartial無引數,其為iterate函式輪轉結束後,返回亂轉資料,iterate和terminatePartial類似於hadoop的Combiner

4)merge接收terminatePartial的返回結果,進行資料merge操作,其返回型別為boolean

5)terminate返回最終的聚集函式結果  

hive hbase 關聯

create 'flash_people','info','label'

create external table if not exists hm2.flash_people(
guid string comment "people guid",
firsttime string comment "首次入庫時間",
ip string comment "使用者ip",
jstimestamp bigint comment "json時間戳"
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties("hbase.columns.mapping" = ":key,info:firsttime,info:ip,:timestamp")
tblproperties("hbase.table.name" = "hm2:flash_people");

hive -e 'set mapred.reduce.tasks = 30;insert into hm2.flash_people select guid,dt,remote_addr,(32523145869-ngx_timestamp) from hm2.data where dt = "2018-07-01" and length(guid) = 38 and ngx_timestamp is not null and ngx_timestamp != '' and ngx_timestamp is regexp '\\d{8}' and remote_addr is not null and remote_addr != '';'
set mapred.reduce.tasks = 30;insert into hm2.flash_people select guid,dt,remote_addr,(32523145869-ngx_timestamp) from hm2.data where dt = "2018-07-01" and length(guid) = 38 and ngx_timestamp is not null and ngx_timestamp != '' and ngx_timestamp rlike'^\\d+$' and remote_addr is not null and remote_addr != '';

hive一些優化引數

set hive.auto.convert.join = false;
set hive.ignore.mapjoin.hint=false;
set hive.exec.parallel=true;
set mapred.reduce.tasks = 60;

新增欄位

alter table hm2.helper add columns(country string, province string, city string);

hive新增欄位後,前面資料會有空值,就算將前面資料hdfs檔案刪除,重新匯入,仍然查詢出來是 NULL,這個問題有待解決。

  • 解決
  • 未解決

刪除欄位

CREATE TABLE test (
creatingTs BIGINT,
a STRING,
b BIGINT,
c STRING,
d STRING,
e BIGINT,
f BIGINT
);

如果需要刪除 column f 列,可以使用以下語句:

ALTER TABLE test REPLACE COLUMNS (
creatingTs BIGINT,
a STRING,
b BIGINT,
c STRING,
d STRING,
e BIGINT
);

增加列:

alter table of_test columns (judegment int)

hive 支援insert,update,delete的配置

hive-site.xml中新增配置

<property>
    <name>hive.support.concurrency</name>
    <value>true</value>
</property>
<property>
    <name>hive.exec.dynamic.partition.mode</name>
    <value>nonstrict</value>
</property>
<property>
    <name>hive.txn.manager</name>
    <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
    <name>hive.compactor.initiator.on</name>
    <value>true</value>
</property>
<property>
    <name>hive.compactor.worker.threads</name>
    <value>1</value>
</property>
<property>
  <name>hive.enforce.bucketing</name>
  <value>true</value>
</property>

建表語句

create external table if not exists hm2.history_helper
(
guid string,
starttime string,
endtime string,
num int
)
clustered by(guid) into 50 buckets
stored as orc TBLPROPERTIES ('transactional'='true');

說明:建表語句必須帶有into buckets子句和stored as orc TBLPROPERTIES ('transactional'='true')子句,並且不能帶有sorted by子句。

這樣,這個表就可以就行insert,update,delete操作了。

使用者表

flash_user

hbase

create 'hm2:flash_user','info','label'

hive

create external table if not exists hm2.flash_user(
guid string comment "使用者的 guid",
starttime string comment "首次入庫時間",
endtime string comment "最後一次訪問時間",
num int comment "訪問天數",
country string,
province string,
city string
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties("hbase.columns.mapping" = ":key,info:starttime,info:endtime,info:num,info:country, info:province, info:cify")
tblproperties("hbase.table.name" = "hm2:flash_user");
insert into hm2.flash_user select guid, starttime, endtime, num from hm2.history_helper where length(guid) == 38;

hive表中的鎖

場景:

在執行insert into或insert overwrite任務時,中途手動將程式停掉,會出現卡死情況(無法提交MapReduce),只能執行查詢操作,而drop insert操作均不可操作,無論執行多久,都會保持卡死狀態

臨時解決辦法是……把表名換一個……

根本原因是:hive表被鎖或者某個分割槽被鎖,需要解鎖

show locks 表名:

可以查看錶被鎖的情況

解鎖

unlock table 表名;  -- 解鎖表
unlock table 表名 partition(dt='2014-04-01');  -- 解鎖某個分割槽

表鎖和分割槽鎖是兩個不同的鎖,對錶解鎖,對分割槽是無效的,分割槽需要單獨解鎖

高版本hive預設插入資料時,不能查詢,因為有鎖

hive> show locks;
OK
[email protected]	EXCLUSIVE

解決辦法:關閉鎖機制

set hive.support.concurrency=false; 預設為true

基本知識

查看錶結構資訊

 desc formatted table_name;
 desc table_name;

data game label 表

create external table if not exists hm2.game_label
(
guid string,
label string
)
partitioned by (dt string)
row format delimited fields terminated by '\t';

匯入資料到hive表

load命令

hive -e 'load data inpath "/data/MROutput/hm2_data_gamelabel_output2/part-r-*" into table hm2.game_label partition(dt="2018-10-11");'

注意:inpath 後接的hdfs路徑需要引號

python中的hive命令字串示例:

cmd = 'hive -e \'load data inpath "%s/part-r-*" into table hm2.game_label partition(dt=%s);\''%(outpath, formatDate(day))

orc格式表

hive建立orc格式表不能像textfile格式一樣直接load資料到表中,一般需要load建立臨時textfile表,然後通過insert into 或者insert overwrite到orc儲存格式表中。

map,reduce知識

  1. 什麼情況下只有一個reduce?

很多時候你會發現任務中不管資料量多大,不管你有沒有設定調整reduce個數的引數,任務中一直都只有一個reduce任務;其實只有一個reduce任務的情況,除了資料量小於hive.exec.reducers.bytes.per.reducer引數值的情況外,還有以下原因:

a) 沒有group by的彙總,比如把select pt,count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt; 寫成 select count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04'; 
這點非常常見,希望大家儘量改寫。 
b) 用了Order by 
c) 有笛卡爾積 
通常這些情況下,除了找辦法來變通和避免,我暫時沒有什麼好的辦法,因為這些操作都是全域性的,所以hadoop不得不用一個reduce去完成; 
同樣的,在設定reduce個數的時候也需要考慮這兩個原則:使大資料量利用合適的reduce數;使單個reduce任務處理合適的資料量。

hive 優化

  1. hive mapreduce引數優化
    設定map,reduce任務分配的資源
set mapreduce.map.memory.mb = 4096 ;
set mapreduce.reduce.memory.mb = 4096 ; 
  1. hive.exec.parallel引數控制在同一個sql中的不同的job是否可以同時執行,預設為false.

下面是對於該引數的測試過程:

測試sql:

select r1.a 
from 
(select t.a from sunwg_10 t join sunwg_10000000 s on t.a=s.b) r1 
join 
(select s.b from sunwg_100000 t join sunwg_10 s on t.a=s.b) r2 
on (r1.a=r2.b);
  • 當引數為false的時候,三個job是順序的執行
set hive.exec.parallel=false;
  • 但是可以看出來其實兩個子查詢中的sql並無關係,可以並行的跑
set hive.exec.parallel=true;

總結:
在資源充足的時候hive.exec.parallel會讓那些存在併發job的sql執行得更快,但同時消耗更多的資源
可以評估下hive.exec.parallel對我們的重新整理任務是否有幫助.

  1. 引數設定
set mapred.max.split.size=256000000;        -- 決定每個map處理的最大的檔案大小,單位為B

hive on spark 知識

cdh 6.0.1 下通過設定:

set hive.execution.engine=spark;

可以將預設的mapreduce執行引擎切換為spark;
apache hadoop 下配置 hive on spark

引數調優

瞭解完了Spark作業執行的基本原理之後,對資源相關的引數就容易理解了。所謂的Spark資源引數調優,其實主要就是對Spark執行過程中各個使用資源的地方,通過調節各種引數,來優化資源使用的效率,從而提升Spark作業的執行效能。以下引數就是Spark中主要的資源引數,每個引數都對應著作業執行原理中的某個部分。

num-executors/spark.executor.instances

  • 引數說明:該引數用於設定Spark作業總共要用多少個Executor程序來執行。Driver在向YARN叢集管理器申請資源時,YARN叢集管理器會盡可能按照你的設定來在叢集的各個工作節點上,啟動相應數量的Executor程序。這個引數非常之重要,如果不設定的話,預設只會給你啟動少量的Executor程序,此時你的Spark作業的執行速度是非常慢的。
  • 引數調優建議:每個Spark作業的執行一般設定50~100個左右的Executor程序比較合適,設定太少或太多的Executor程序都不好。設定的太少,無法充分利用叢集資源;設定的太多的話,大部分佇列可能無法給予充分的資源。

executor-memory/spark.executor.memory

  • 引數說明:該引數用於設定每個Executor程序的記憶體。Executor記憶體的大小,很多時候直接決定了Spark作業的效能,而且跟常見的JVM OOM異常,也有直接的關聯。
  • 引數調優建議:每個Executor程序的記憶體設定4G8G較為合適。但是這只是一個參考值,具體的設定還是得根據不同部門的資源佇列來定。可以看看自己團隊的資源佇列的最大記憶體限制是多少,num-executors乘以executor-memory,是不能超過佇列的最大記憶體量的。此外,如果你是跟團隊裡其他人共享這個資源佇列,那麼申請的記憶體量最好不要超過資源佇列最大總記憶體的1/31/2,避免你自己的Spark作業佔用了佇列所有的資源,導致別的同學的作業無法執行。

executor-cores/spark.executor.cores

  • 引數說明:該引數用於設定每個Executor程序的CPU core數量。這個引數決定了每個Executor程序並行執行task執行緒的能力。因為每個CPU core同一時間只能執行一個task執行緒,因此每個Executor程序的CPU core數量越多,越能夠快速地執行完分配給自己的所有task執行緒。
  • 引數調優建議:Executor的CPU core數量設定為2~4個較為合適。同樣得根據不同部門的資源佇列來定,可以看看自己的資源佇列的最大CPU core限制是多少,再依據設定的Executor數量,來決定每個Executor程序可以分配到幾個CPU core。同樣建議,如果是跟他人共享這個佇列,那麼num-executors * executor-cores不要超過佇列總CPU core的1/3~1/2左右比較合適,也是避免影響其他同學的作業執行。

driver-memory

  • 引數說明:該引數用於設定Driver程序的記憶體。
  • 引數調優建議:Driver的記憶體通常來說不設定,或者設定1G左右應該就夠了。唯一需要注意的一點是,如果需要使用collect運算元將RDD的資料全部拉取到Driver上進行處理,那麼必須確保Driver的記憶體足夠大,否則會出現OOM記憶體溢位的問題。

spark.default.parallelism

  • 引數說明:該引數用於設定每個stage的預設task數量。這個引數極為重要,如果不設定可能會直接影響你的Spark作業效能。
  • 引數調優建議:Spark作業的預設task數量為500~1000個較為合適。很多同學常犯的一個錯誤就是不去設定這個引數,那麼此時就會導致Spark自己根據底層HDFS的block數量來設定task的數量,預設是一個HDFS block對應一個task。通常來說,Spark預設設定的數量是偏少的(比如就幾十個task),如果task數量偏少的話,就會導致你前面設定好的Executor的引數都前功盡棄。試想一下,無論你的Executor程序有多少個,記憶體和CPU有多大,但是task只有1個或者10個,那麼90%的Executor程序可能根本就沒有task執行,也就是白白浪費了資源!因此Spark官網建議的設定原則是,設定該引數為num-executors * executor-cores的2~3倍較為合適,比如Executor的總CPU core數量為300個,那麼設定1000個task是可以的,此時可以充分地利用Spark叢集的資源。

spark.storage.memoryFraction

  • 引數說明:該引數用於設定RDD持久化資料在Executor記憶體中能佔的比例,預設是0.6。也就是說,預設Executor 60%的記憶體,可以用來儲存持久化的RDD資料。根據你選擇的不同的持久化策略,如果記憶體不夠時,可能資料就不會持久化,或者資料會寫入磁碟。
  • 引數調優建議:如果Spark作業中,有較多的RDD持久化操作,該引數的值可以適當提高一些,保證持久化的資料能夠容納在記憶體中。避免記憶體不夠快取所有的資料,導致資料只能寫入磁碟中,降低了效能。但是如果Spark作業中的shuffle類操作比較多,而持久化操作比較少,那麼這個引數的值適當降低一些比較合適。此外,如果發現作業由於頻繁的gc導致執行緩慢(通過spark web ui可以觀察到作業的gc耗時),意味著task執行使用者程式碼的記憶體不夠用,那麼同樣建議調低這個引數的值。

spark.shuffle.memoryFraction

  • 引數說明:該引數用於設定shuffle過程中一個task拉取到上個stage的task的輸出後,進行聚合操作時能夠使用的Executor記憶體的比例,預設是0.2。也就是說,Executor預設只有20%的記憶體用來進行該操作。shuffle操作在進行聚合時,如果發現使用的記憶體超出了這個20%的限制,那麼多餘的資料就會溢寫到磁碟檔案中去,此時就會極大地降低效能。
  • 引數調優建議:如果Spark作業中的RDD持久化操作較少,shuffle操作較多時,建議降低持久化操作的記憶體佔比,提高shuffle操作的記憶體佔比比例,避免shuffle過程中資料過多時記憶體不夠用,必須溢寫到磁碟上,降低了效能。此外,如果發現作業由於頻繁的gc導致執行緩慢,意味著task執行使用者程式碼的記憶體不夠用,那麼同樣建議調低這個引數的值。

原文連結


Spark On Yarn執行中executor記憶體限制問題

解決Spark On Yarn執行中executor記憶體限制問題

叢集版本 Spark 2.2.0 + Hadoop 3.0-CDH6.0.1

hive on saprk , 設定:

hive> set hive.execution.engine=spark;
hive> set spark.executor.memory=31.5g;
hive> set spark.executor.cores=11;
hive> set spark.serializer=org.apache.spark.serializer.KryoSerializer;

提示記憶體不足

Failed to execute spark task, with exception 'org.apache.hadoop.hive.ql.metadata.HiveException(Failed to create Spark client for Spark session 50288c8b-96aa-44ad-9eea-3cb4abb1ae5b)'
FAILED: Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Failed to create Spark client for Spark session 50288c8b-96aa-44ad-9eea-3cb4abb1ae5b

解決方案,修改Yarn的配置檔案:

1、yarn.nodemanager.resource.memory-mb容器記憶體

設定為 至少 : executor-memory(15g) + driver(512m)的記憶體,如上例可配置為 16g

2、yarn.scheduler.maximum-allocation-mb最大容器記憶體

設定為 至少 : executor-memory(15g) + driver(512m)的記憶體,如上例可配置為 16g

第一個引數為NodeManager的配置 ,第二個引數為 ResourceManager的配置。

字串處理

字串連線:

concat(str, str2, str3,...) 字串連線
concat_ws(separator, str, str2, str3, ...) 將字串用separator作為間隔連線起來

字串擷取

substr(s, 0, 1) 擷取第一個字元
substr(s, -1) 擷取最後一個字元

hive 中 join

mapjoin的優化在於,在mapreduce task開始之前,建立一個local task,小表以hshtable的形式載入到記憶體,然後序列化到磁碟,把記憶體的hashtable壓縮為tar檔案。然後把檔案分發到 Hadoop Distributed Cache,然後傳輸給每一個mapper,mapper在本地反序列化檔案並載入進記憶體在做join

sql

select workflow,count(workflow) from (select guid, substr(workflow, -1) workflow from hm2.workflow_list) m right join hm2.helper helper on m.guid = helper.guid and helper.dt = "2018-10-21" group by workflow;

記憶體溢位解決辦法:

set hive.auto.convert.join = false;
set hive.ignore.mapjoin.hint=false;
set hive.exec.parallel=true;

Hive中Join的原理和機制

籠統的說,Hive中的Join可分為Common Join(Reduce階段完成join)和Map Join(Map階段完成join)。本文簡單介紹一下兩種join的原理和機制。

Hive Common Join

如果不指定MapJoin或者不符合MapJoin的條件,那麼Hive解析器會將Join操作轉換成Common Join,即:在Reduce階段完成join.
整個過程包含Map、Shuffle、Reduce階段。

  • Map階段

讀取源表的資料,Map輸出時候以Join on條件中的列為key,如果Join有多個關聯鍵,則以這些關聯鍵的組合作為key;
Map輸出的value為join之後所關心的(select或者where中需要用到的)列;同時在value中還會包含表的Tag資訊,用於標明此value對應哪個表;
按照key進行排序

  • Shuffle階段

    根據key的值進行hash,並將key/value按照hash值推送至不同的reduce中,這樣確保兩個表中相同的key位於同一個reduce中

  • Reduce階段

    根據key的值完成join操作,期間通過Tag來識別不同表中的資料。

以下面的HQL為例,圖解其過程:

SELECT 
 a.id,a.dept,b.age 
FROM a join b 
ON (a.id = b.id);

Hive Map Join

MapJoin通常用於一個很小的表和一個大表進行join的場景,具體小表有多小,由引數hive.mapjoin.smalltable.filesize來決定,該引數表示小表的總大小,預設值為25000000位元組,即25M。
Hive0.7之前,需要使用hint提示 /*+ mapjoin(table) */才會執行MapJoin,否則執行Common Join,但在0.7版本之後,預設自動會轉換Map Join,由引數hive.auto.convert.join來控制,預設為true.
仍然以9.1中的HQL來說吧,假設a表為一張大表,b為小表,並且hive.auto.convert.join=true,那麼Hive在執行時候會自動轉化為MapJoin。

如圖中的流程,首先是Task A,它是一個Local Task(在客戶端本地執行的Task),負責掃描小表b的資料,將其轉換成一個HashTable的資料結構,並寫入本地的檔案中,之後將該檔案載入到DistributeCache中,該HashTable的資料結構可以抽象為:

key value
1 26
2 34
  • 接下來是Task B,該任務是一個沒有Reduce的MR,啟動MapTasks掃描大表a,在Map階段,根據a的每一條記錄去和DistributeCache中b表對應的HashTable關聯,並直接輸出結果。
  • 由於MapJoin沒有Reduce,所以由Map直接輸出結果檔案,有多少個Map Task,就有多少個結果檔案。

原文連結

轉義字元

hive> select split('a:1||b:2||c:3','\\|\\|') from hm2.test;
OK
["a:1","b:2","c:3"]
["a:1","b:2","c:3"]
["a:1","b:2","c:3"]
["a:1","b:2","c:3"]

insert table select from

insert into tbName select * from tbName2;
insert overwrite table tbName select * from tbName2;

insert overwrite例子

insert overwrite table hm2.helper partition(dt = '2018-06-22', hour = '09',msgtype = 'helper') select time,source,remote_addr,remote_user,body_bytes_sent,request_time,status,host,request_method,http_referrer,http_x_forwarded_for,http_user_agent,upstream_response_time,upstream_addr,guid,helperversion,osversion,ngx_timestamp,get_type,split(ip2area(http_x_forwarded_for,remote_addr), "\t")[0] country,split(ip2area(http_x_forwarded_for,remote_addr), "\t")[1] province,split(ip2area(http_x_forwarded_for,remote_addr), "\t")[2] city from hm2.helper where dt = '2018-06-22' and hour = '09' and msgtype = 'helper';

插入分割槽表,不用指定分割槽,可以自動識別

INSERT overwrite TABLE test.dis_helper PARTITION (dt,hour,msgtype) select `(num)?+.+` from (select *,row_number() over (partition by guid order by time asc) num from hm2.helper where dt ='2018-09-06'and hour between '00' and '23' and msgtype='helper') t where t.num=1;

這裡把資料去重,插入分割槽表test.dis_helper中,自動根據dt,hour,msgtype欄位的取值進入分割槽表,並且(num)?+.+表示除了num這個欄位。