1. 程式人生 > >反射構建DataFrame樣例類引數大於22

反射構建DataFrame樣例類引數大於22

這個錯誤出現在case class引數超出22個的時候。

case classes cannot have more than 22 parameters
1
在scala 2.11.x版本以下時case class 的引數最多為22個,如果超過這個引數又不能升級時(比如目前spark依賴於2.10.x)要怎麼辦?

下面解決方法 : 繼承 Product 方法 實現序列化
這裡舉個26個引數的例子,其餘情況依此類推

object Bz2Parquet2 {
def main(args: Array[String]): Unit = {
//模擬企業級程式設計 首先判斷目錄是否為空
if(args.length != 2){
println(“目錄不正確,退出程式”)
sys.exit()
}
//建立一個集合儲存輸入輸出目錄
val conf = new SparkConf()
.setAppName(s"${this.getClass.getName}").setMaster(“local[*]”)
//搞定第二個需求
.set(“spark.serializer”,“org.apache.spark.serializer.KryoSerializer”)
val sc = new SparkContext(conf)
val sQLContext = new SQLContext(sc)
// 在spark 1.6 版本時候預設的壓縮方式還不是snappy,到2.0以後才預設snappy
sQLContext.setConf(“spark.sql.parquet.compression.codec”,“snappy”)
//開始讀取資料
val lines = sc.textFile(inputPath)
//進行過濾,保證欄位大於85,並且 需要解析內部的, 要進行特殊處理

val rowRDD = lines.map(t=>t.split(",",t.length)).filter(_.length >= 27).map(arr=>{
 new ss(
    arr(0),
    NBF.toInt(arr(1)),
    NBF.toInt(arr(2)),
    NBF.toInt(arr(3)),
    NBF.toInt(arr(4)),
    arr(5),
    arr(6),
    NBF.toInt(arr(7)),
    NBF.toInt(arr(8)),
    NBF.toDouble(arr(9)),
    NBF.toDouble(arr(10)),
    arr(11),
    arr(12),
    arr(13),
    arr(14),
    arr(15),
    arr(16),
    NBF.toInt(arr(17)),
    arr(18),
    arr(19),
    NBF.toInt(arr(20)),
    NBF.toInt(arr(21)),
    arr(22),
    arr(23),
    arr(24),
    arr(25),
    NBF.toInt(arr(26))
  )
})
import sQLContext.implicits._
val df = rowRDD.toDF()
//儲存parquet檔案
df.coalesce(1).write.parquet(outputPath)
sc.stop()

}
}

//自定義的類繼承Product方法,實現序列化
class ss(
sessionid:String,
advertisersid:Int,
adorderid:Int,
adcreativeid:Int,
adplatformproviderid:Int,
sdkversion:String,
adplatformkey:String,
putinmodeltype:Int,
requestmode:Int,
adprice:Double,
adppprice:Double,
requestdate:String,
ip:String,
appid:String,
appname:String,
uuid:String,
device:String,
client:Int,
osversion:String,
density:String,
pw:Int,
ph:Int,
long:String,
lat:String,
provincename:String,
cityname:String,
ispid:Int,
ispname:String,
networkmannerid:Int,
networkmannername:String,
iseffective:Int,
isbilling:Int,
adspacetype:Int,
adspacetypename:String,
devicetype:Int,
processnode:Int,
apptype:Int,
district:String,
paymode:Int,
isbid:Int
)extends Product() with Serializable{
def productElement(n: Int) = n match {
case 0 =>sessionid
case 1 =>advertisersid
case 2 =>adorderid
case 3 =>adcreativeid
case 4 =>adplatformproviderid
case 5 =>sdkversion
case 6 =>adplatformkey
case 7 =>putinmodeltype
case 8 =>requestmode
case 9 =>adprice
case 10=>adppprice
case 11=>requestdate
case 12=>ip
case 13=>appid
case 14=>appname
case 15=>uuid
case 16=>device
case 17=>client
case 18=>osversion
case 19=>density
case 20=>pw
case 21=>ph
case 22=>long
case 23=>lat
case 24=>provincename
case 25=>cityname
case 26=>ispid

}
def canEqual(that: Any) = that.isInstanceOf[ss]
def productArity = 27
}