1. 程式人生 > >Spark:如何替換sc.parallelize(List(item1,item2)).collect().foreach(row=>{})為並行?

Spark:如何替換sc.parallelize(List(item1,item2)).collect().foreach(row=>{})為並行?

tty ima tle items class tab 個數 min 集合

代碼場景:

1)設定的幾種數據場景,遍歷所有場景:依次統計滿足每種場景條件下的數據,並把統計結果存入hive;

2)已有代碼如下:

    case class IndoorOTTCalibrateBuildingVecotrLegend(oid: Int, minHeight: Int, maxHeight: Int, minGridIDCount: Int, maxGridIDCount: Int, heightType: Int) extends Serializable

    //  實例化建築物區間段:按照柵格的個數(面積)、樓的高度(商場等場景)來劃分場景
    val buildingHeightLegends = List(
      IndoorOTTCalibrateBuildingVecotrLegend(
1, 1, 30, 1, 21, BuildingCalibrateHeightType.HeightType1.toString.toInt), IndoorOTTCalibrateBuildingVecotrLegend(2, 1, 30, 21, 45, BuildingCalibrateHeightType.HeightType2.toString.toInt), IndoorOTTCalibrateBuildingVecotrLegend(3, 1, 30, 45, 100, BuildingCalibrateHeightType.HeightType3.toString.toInt), IndoorOTTCalibrateBuildingVecotrLegend(
4, 30, 50, 1, 21, BuildingCalibrateHeightType.HeightType4.toString.toInt), IndoorOTTCalibrateBuildingVecotrLegend(5, 30, 50, 21, 45, BuildingCalibrateHeightType.HeightType5.toString.toInt), IndoorOTTCalibrateBuildingVecotrLegend(6, 30, 50, 45, 100, BuildingCalibrateHeightType.HeightType6.toString.toInt), IndoorOTTCalibrateBuildingVecotrLegend(
7, 50, 5000, 1, 100, BuildingCalibrateHeightType.HeightType7.toString.toInt) ) spark.sparkContext.parallelize(buildingHeightLegends).collect().foreach(buildingHeightLegend => { generateSampleBySenceType(spark, p_city, p_hour_start, p_hour_end, p_fpb_day, p_day_sample, linkLossCalibrateParameter, buildingHeightLegend) })

備註:

在generateSampleBySenceType()函數內部包含有:

spark.sql(s"""
|xxx |where t10.heihgt>=${buildingHieghtLegend.MinHeight} and t10.height<${buildingHieghtLegend.MaxHeight} |and t10.gridcount<=${buildingHieghtLegend.MinGridIDCount} and t10.gridcount>${buildingHieghtLegend.MaxGridIDCount}
|""".stripMargin)

如果把代碼修改:

    val buildingHeightLegends_df = spark.sqlContext.createDataFrame(buildingHeightLegends)
    buildingHeightLegends_df.createOrReplaceTempView("temp_buildingheightlegends")
    
    sql(s"""|select * from temp_buildingheightlegends""".stripMargin).repartition(buildingHeightLegends.length).foreachPartition(rows => {
      for (row <- rows) {
        val buildingHeightLegend = new IndoorOTTCalibrateBuildingVecotrLegend(
          row.getAs[Int]("oid"),
          row.getAs[Int]("minheight"),
          row.getAs[Int]("maxheight"),
          row.getAs[Int]("mingrididcount"),
          row.getAs[Int]("maxgrididcount"),
          row.getAs[Int]("heighttype"))
        generateSampleBySenceType(spark, p_city, p_hour_start, p_hour_end, p_fpb_day, p_day_sample, linkLossCalibrateParameter, buildingHeightLegend)
      }
    })

則會提示:generateSampleBySenceType()內部sql代碼位置拋出SparkSession為NULL的異常。

修改方案:

把buildingHeightLegends註冊為臨時表temp_buildingHeightLegends,去掉外層的foreach,之後在generateSampleBySenceType()內部把temp_buildingHeightLegends與其他結果集合進行cross join:

測試代碼如下:

-- 場景表
CREATE TABLE [dbo].[test_senceitems](
    [sencetype] [int] NULL,
    [minheight] [int] NULL,
    [maxheight] [int] NULL,
    [mingridcount] [int] NULL,
    [maxgridcount] [int] NULL
)
INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (1, 1, 30, 1, 21)
INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (2, 1, 30, 21, 45)
INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (3, 1, 30, 45, 100)
INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (4, 30, 50, 1, 21)
INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (5, 30, 50, 21, 45)
INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (6, 30, 50, 45, 100)
INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (7, 50, 5000, 1, 100)

-- 業務過濾統計表
CREATE TABLE [dbo].[test_grid](
    [gridid] [nvarchar](50) NULL,
    [height] [int] NULL,
    [gridcount] [int] NULL
) 

INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (Ng1, 8, 23)
INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (Ng2, 3, 87)
INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (Ng3, 4, 34)
INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (Ng4, 30, 54)
INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (Ng5, 32, 32)
INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (Ng6, 32, 20)
INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (Ng7, 120, 34)
INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (Ng8, 89, 54)
INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (Ng9, 9, 16)

替換generateSampleBySenceType()內部sql(s"""|""".stripMargin)代碼類似如下:

select t10.*,t11.* 
from test_grid t10 
cross join test_senceitems t11
where t10.height>=t11.minheight and t10.height<t11.maxheight
and t10.gridcount>=t11.mingridcount and t10.gridcount<t11.maxgridcount

技術分享圖片

Spark:如何替換sc.parallelize(List(item1,item2)).collect().foreach(row=>{})為並行?