1. 程式人生 > 其它 >sparkstreaming 實時讀取kafka寫入hive優化(高流量)

sparkstreaming 實時讀取kafka寫入hive優化(高流量)

背景:

kafka流量在800M/s,前任留下的程式大量資料丟失,且邏輯生成複雜,查詢hive直接奔潰,優化從兩方面,程式優化及小檔案合併(生成結果產生大量小檔案)

程式直接上程式碼,啥也不說了

程式

def main(args: Array[String]): Unit = {
val sdf = new SimpleDateFormat("yyyyMMddHHmm")
val broker_list = "XXXX";
val zk = "xxx";
val confSpark = new SparkConf()
.setAppName("kafka2hive")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.rdd.compress", "true")
.set("spark.sql.shuffle.partitions", "512") //生成的partition根據kafka topic 分割槽生成,這個配置項貌似沒效果
.set("spark.streaming.stopGracefullyOnShutdown", "true") //能夠處理完最後一批資料,再關閉程式,不會發生強制kill導致資料處理中斷,沒處理完的資料丟失
.set("spark.streaming.backpressure.enabled","true")//開啟後spark自動根據系統負載選擇最優消費速率
.set("spark.shuffle.manager", "sort")
.set("spark.locality.wait", "5ms")
//.setMaster("local[*]")

val kafkaMapParams = Map(
"auto.offset.reset" -> "largest",
"group.id" -> "kafka2dhive",
"zookeeper.session.timeout.ms" -> "40000",
"metadata.broker.list" -> broker_list,
"zookeeper.connect" -> zk
)
val topicsSet = Set("innerBashData")
val sc = new SparkContext(confSpark)
val ssc = new StreamingContext(sc,Seconds(30)) //這個是重點微批處理,根據自己的機器資源,測試調整
val sqlContext = new HiveContext(sc)
var daily = sdf.format(new Date()).substring(0,8)
var dailyTableName = "bashdata"+daily;
val schema = StructType(
StructField("ver", StringType, true) ::
StructField("session_id", StringType, true) ::
StructField("host_time", StringType, true) ::
StructField("host_name", StringType, true) ::
StructField("src_ip", StringType, true) ::
Nil)

sqlContext.sql(s"""create table if not exists $dailyTableName(
a string ,
b string ,
c string ,
d string ,
e string
)
PARTITIONED BY (hours string,min string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)

val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaMapParams, topicsSet)
lines.foreachRDD( beforerdd => {
val rdd = beforerdd.map( rdd1 => {
rdd1._2
})
rdd.cache()

val agentDataFrame = sqlContext.read.schema(schema).json(rdd)
// .coalesce(10) //控制檔案輸出個數
agentDataFrame.registerTempTable("tmp_bashdata")
sqlContext.sql("set hive.exec.dynamic.partition = true")
sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
sqlContext.sql("set hive.mapred.supports.subdirectories=true")
sqlContext.sql("set mapreduce.input.fileinputformat.input.dir.recursive=true")
sqlContext.sql("set mapred.max.split.size=256000000")
sqlContext.sql("set mapred.min.split.size.per.node=128000000")
sqlContext.sql("set mapred.min.split.size.per.rack=128000000")
sqlContext.sql("set hive.merge.mapfiles=true")
sqlContext.sql("set hive.merge.mapredfiles=true")
sqlContext.sql("set hive.merge.size.per.task=256000000")
sqlContext.sql("set hive.merge.smallfiles.avgsize=256000000")
sqlContext.sql("set hive.groupby.skewindata=true")

var hours = sdf.format(new Date()).substring(8,10)
var min = sdf.format(new Date()).substring(10,12) //每10分鐘生成一個資料夾,這tm資料量也夠大的
sqlContext.sql(
s"""
|INSERT OVERWRITE TABLE $dailyTableName PARTITION(hours='$hours', min='$min')
|SELECT
| a,
| b,
| c,
| d,
| e
|FROM tmp_bashdata
""".stripMargin)

});
ssc.start()
ssc.awaitTermination()

小檔案合併

核心思想是重新生成一張表,指定分割槽數。指令碼如下:

set mapred.reduce.tasks=5;
set mapred.max.split.size=512000000;
insert into table yhtable PARTITION(hours=14,min=1)
select
ver,
session_id,
host_time,
host_name,
src_ip
from aa20190624 where hours=14 and min=0;

————————————————
版權宣告:本文為CSDN博主「silentanytime」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處連結及本宣告。
原文連結:https://blog.csdn.net/silentanytime/article/details/94395713