1. 程式人生 > >如何解決spark寫hive慢的問題

如何解決spark寫hive慢的問題

在使用spark寫hive過程中,發現最耗時的部分是將產生的結果寫入hive,舉個例子,對3g*1G表的join來講,將結果使用以下方式直接寫入hive表需要超過半小時的時間:

dataframe.registerTempTable("result")

sql(s"""INSERT OVERWRITE Table $outputTable PARTITION (dt ='$outputDate') select * from result""")

而整個結果資料的產生只需要4分鐘左右的時間,比如以下方式:將結果以textfile存入hdfs:

result.rdd.saveAsTextFile(output_tmp_dir)

由此可見,對hive的寫入操作耗用了大量的時間。

對此現象的優化可以是,將檔案存為符合hive table檔案的格式,然後使用hive load將產生的結果檔案直接move到指定目錄下。程式碼如下:

result.rdd.map { r => r.mkString("\001") }.repartition(partitions).saveAsTextFile(output_tmp_dir)

sql(s"""load data inpath '$output_tmp_dir' overwrite into table $output partition (dt='$dt')""")

詳解:

  1. result.rdd.map { r => r.mkString("\001") }.repartition(partitions).saveAsTextFile(output_tmp_dir):
    hive column預設分隔符在scala/java中的表示為“/001”,r.mkString("/001")既是將column以分隔符/001進行分割,hive在匯入時會自動識別。
    repartition(partitions)是為了防止hdfs中產生大量小檔案。partitions的設定與最終結果大小有關,一般是result_size/hdfs_block_size。
  2. sql(s"""load data inpath '$output_tmp_dir' overwrite into table $output partition (dt='$dt')""")
    此處使用hive load data命令,將hdfs檔案load到hive表中。後臺操作為直接將目錄下的檔案移到hive table所在目錄,所以只是hdfs move資料的過程,執行非常快。 
  3. 需要注意的是,此處要求hive建表時,已textfile格式建表。orc的方式不支援。對orc的表,可以建立臨時表使用textfile臨時儲存,然後用以下命令進行匯入:
    sql(s"""load data inpath '$output_tmp_dir' overwrite into table $tmp_table partition (dt='$dt')""")
    sql(s"""INSERT OVERWRITE Table $outputTable PARTITION (dt ='$outputDate') select * from $tmp_table where dt='$dt'""") 
    在資源配置為--num-executors 20 --executor-cores 4,結果資料為1.8g的情況下,需要額外耗時50s。好處是結果資料使用列式、壓縮方式儲存,壓縮比12.7左右。

使用優化後的方式,原有test case的耗時從半小時降到4分鐘,效率提升明顯。