1. 程式人生 > 實用技巧 >hive 結合執行計劃 分析 limit 執行原理

hive 結合執行計劃 分析 limit 執行原理

在hive查詢中要限制查詢輸出條數, 可以用limit 關鍵詞指定,如 select columnname1 from table1 limit 10; 這樣hive將輸出符合查詢條件的10個記錄,從根本上說, hive是hadoop提交作業的客戶端,它使用antlr詞法語法分析工具,對SQL進行分析優化後翻譯成一系列MapReduce作業,向hadoop提交執行作業以得到結果.
看一條簡單的SQL語句:

  1. selectdeviceidfromt_aa_pc_logwherept='2012-07-07-00'limit1;

這條語句指定分割槽欄位 pt為2012-07-07-00, 限制結果為 limit 1. 假設執行這個MR作業需要5個map, 那麼每個map應該輸出一條記錄,從jobtrack 的jobdetails頁面中的計數器中 Map Input Records 一項應該顯示為5(即該作業中Map階段總共輸入5條記錄),結果是否如預計的那樣, 通過執行改SQL來驗證:

  1. >selectdeviceidfromt_aa_pc_logwherept='2012-07-07-00'limit1;
  2. TotalMapReducejobs=1
  3. LaunchingJob1outof1
  4. Numberofreducetasksissetto0sincethere'snoreduceoperator
  5. StartingJob=job_201205162059_1547550,TrackingURL=http://jt.dc.sh-wgq.sdo.com:50030/jobdetails.jsp?jobid=job_201205162059_1547550
  6. KillCommand=/home/hdfs/hadoop-current/bin/hadoopjob
    -Dmapred.job.tracker=10.133.10.103:50020-killjob_201205162059_1547550
  7. 2012-07-0716:22:42,570Stage-1map=0%,reduce=0%
  8. 2012-07-0716:22:48,628Stage-1map=80%,reduce=0%
  9. 2012-07-0716:22:49,640Stage-1map=100%,reduce=0%
  10. 2012-07-0716:22:50,654Stage-1map=100%,reduce=100%
  11. EndedJob=job_201205162059_1547550
  12. OK
  13. 0cf49387a23d9cec25da3d76d6988546
  14. Timetaken:13.499seconds
  15. hive>

正如limit 1限制,輸出一條記錄,再通過http://jt.dc.sh-wgq.sdo.com:50030/jobdetails.jsp?jobid=job_201205162059_1547550
檢視Map Input Records項:

上圖顯示Map Input Records實際上是35,並非之前設想的每個MAP一條,總共5條,那多出來的30條記錄又是怎麼來的? 實際上這個跟hive mapreduce實現有關,先來看看上面這條SQL的執行計劃:

  1. >explainselectdeviceidfromt_aa_pc_logwherept='2012-07-07-00'limit1;
  2. OK
  3. STAGEDEPENDENCIES:
  4. Stage-1isarootstage
  5. Stage-0isarootstage
  6. STAGEPLANS:
  7. Stage:Stage-1
  8. MapReduce
  9. Alias->MapOperatorTree:
  10. t_aa_pc_log
  11. TableScan
  12. alias:t_aa_pc_log
  13. FilterOperator
  14. predicate:
  15. expr:(pt='2012-07-07-00')
  16. type:boolean
  17. SelectOperator
  18. expressions:
  19. expr:deviceid
  20. type:string
  21. outputColumnNames:_col0
  22. Limit
  23. FileOutputOperator
  24. compressed:false
  25. GlobalTableId:0
  26. table:
  27. inputformat:org.apache.hadoop.mapred.TextInputFormat
  28. outputformat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
  29. Stage:Stage-0
  30. FetchOperator
  31. limit:1
  32. Timetaken:0.418seconds

改執行計劃顯示,Stage-1 是一個MR程式,且只有map過程, 沒有reduce過程,也就是說在Map過程就直接將結果輸出到HDFS檔案系統, Stage-0是依賴於Stage-1的檔案讀取操作,它不是MR作業,只是一個基於hadoop檔案系統客戶端的分散式檔案讀取程式。
重點分析Stage-1過程,一條記錄被讀取後呼叫hive自定義mapper函式,依次經過
TableScanOperator ->Filter Operator ->Select Operator ->Limit Operator->File Output Operator, 以上每一個Operator都是hive定義的一個處理過程, 每一個 Operator都定義有:

  1. protectedList<Operator<?extendsSerializable>>childOperators;
  2. protectedList<Operator<?extendsSerializable>>parentOperators;

這樣就構成了一個 Operator圖,hive正是基於這些圖關係來處理諸如limit, group by, join等操作. Operator 基類定義一個:

  1. protectedbooleandone;// 初始化值為false

這個欄位指示某一個層級的Operator是否已經處理完成,每當一條記錄進入特定的Operator操作時,當前Operator會判斷自己的childOperators 的done是否全部為true, 如果是, 表示childOperators已去全部處理完畢, 當前這個Operator也把自己的 done設定為true, 這樣層層返回,直到最外層的Operator, 這個查詢中涉及的部分Operator如下圖:

該hive MR作業中指定的mapper是:

  1. mapred.mapper.class= org.apache.hadoop.hive.ql.exec.ExecMapper

input format是:

  1. hive.input.formatorg.apache.hadoop.hive.ql.io.CombineHiveInputFormat

部分執行流程:

MapRunner會迴圈呼叫CombineHiveRecordReader的doNext方法讀入行記錄,直到doNext方法返回false, doNext方法中有一個重要的邏輯來控制記錄讀取是否結束

  1. @Override
  2. publicbooleandoNext(Kkey,Vvalue)throwsIOException{
  3. if(ExecMapper.getDone()){
  4. returnfalse;
  5. }
  6. returnrecordReader.next(key,value);
  7. }

每讀取一條記錄都會判斷 MapRunner.getDone()是否為真, 如果是則結束Mapper讀取過程, ExecMapper類中定義了一個靜態變數done(靜態非常重要,因為在hadoop框架下執行時 CombineHiveRecordReader無法拿到 ExecMapper例項), 當 MapRunner讀取一條記錄後就會呼叫 MapRunner的map函式, ExecMapper中定義了一個MapOperator,MapOperator的 childOperators列表中持有TableScanOperator例項,依次類推, 各Operator遞迴包含.
ExecMapper的map函式被呼叫時會先判斷 MapOperator的done是否為true, 如果是,則將自己的靜態變數done設定為true(這樣 CombineHiveRecordReader在下一次讀取記錄時發現 ExecMapper的done為true, 結束mapper記錄讀取),否則執行MapOperator的process方法, 具體邏輯如下:

  1. publicvoidmap(Objectkey,Objectvalue,OutputCollectoroutput,
  2. Reporterreporter)throwsIOException{
  3. if(oc==null){
  4. oc=output;
  5. rp=reporter;
  6. mo.setOutputCollector(oc);
  7. mo.setReporter(rp);
  8. }
  9. //resettheexecContextforeachnewrow
  10. execContext.resetRow();
  11. try{
  12. if(mo.getDone()){
  13. done=true;
  14. }else{
  15. //Sincethereisnoconceptofagroup,wedon'tinvoke
  16. //startGroup/endGroupforamapper
  17. mo.process((Writable)value);

接下來再看看各Operator如何判斷自己狀態是否為執行完成:

  1. intchildrenDone=0;
  2. for(inti=0;i<childOperatorsArray.length;i++){
  3. Operator<?extendsSerializable>o=childOperatorsArray[i];
  4. if(o.getDone()){
  5. childrenDone++;
  6. }else{
  7. o.process(row,childOperatorsTag[i]);
  8. }
  9. }
  10. //ifallchildrenaredone,thisoperatorisalsodone
  11. if(childrenDone==childOperatorsArray.length){
  12. setDone(true);
  13. }

每個Operator都判斷自己的子Operator狀態是否全部完成, 如果是則把自己的狀態也設定成done=true.
最後再看LimitOperator的判斷邏輯:

  1. @Override
  2. publicvoidprocessOp(Objectrow,inttag)throwsHiveException{
  3. if(currCount<limit){
  4. forward(row,inputObjInspectors[tag]);
  5. currCount++;
  6. }else{
  7. setDone(true);
  8. }
  9. }

currCount是一個記錄處理的計數器, 初始值為0, 當該值大於等於limit後,將自己標識成處理完成狀態,即設定done=true.
分析到現在, 已經可以非常清晰的解釋最初的疑問了, 為什麼 limit 1, map數為5的前提下, Map Input Records 是35而不是5
1. 第一條記錄進入LimitOperator done 為false
2. 第二條記錄進入LimitOperator done 為true
3. 第三條記錄進入SelectOperator done 設定為true
4. 第四條記錄進入FilterOperator done設定為true
5. 第五條記錄進入TableScanOperator done設定為true
6. 第六條記錄進入MapOperator done設定為true
7. 第7條記錄進入ExecMapper 靜態變數done設定為true
8. 讀取第八條記錄時 CombineHiveRecordReader發現 ExecMapper的done已經為true, 結束資料讀取,從而 MapRunner退出迴圈, 結束mapper過程.
從上面8個步驟看出, 每個map會讀取7條記錄, 5個map, 正好是35條記錄.
在平時工作中, 通過分析 hive 執行計劃可以讓我們清楚的知道MR中的每一個過程,理解HIVE執行過程, 進而對SQL優化.

轉載於:https://blog.51cto.com/yaoyinjie/923378