Spark自定義類在Driver和Executor端傳輸問題
方式一:自定義一個類,並且這個類需要實現Serializable介面
1.首先寫一個class自定義類
class Rules extends Serializable {
val rulesMap = Map("hadoop" -> 2.7, "spark" -> 2.2)
//val hostname = InetAddress.getLocalHost.getHostName
//println(hostname + "@@@@@@@@@@@@@@@@")
}
2.寫一個spark程式
val conf = new SparkConf().setAppName("SerTest") val sc = new SparkContext(conf) val lines: RDD[String] = sc.textFile(args(0)) val r = lines.map(word => { val rules = new Rules val hostname = InetAddress.getLocalHost.getHostName val threadName = Thread.currentThread().getName (hostname, threadName, rules.rulesMap.getOrElse(word, 0), rules.toString) })
總結:如果採用這種方式的話,會導致rules物件被多次建立,因為map方法是對RDD中的每一條資料都執行那裡面的演算法的,所以說這種方式會造成資源的浪費。自定義的類要想在運算元中使用的話,就必須將自定義的類進行序列化,也就是基層Serializable介面,不然會拋異常Rules物件應該在運算元外面進行new,如下:
val conf = new SparkConf().setAppName("SerTest") val sc = new SparkContext(conf) val lines: RDD[String] = sc.textFile(args(0)) val rules = new Rules val r = lines.map(word => { val hostname = InetAddress.getLocalHost.getHostName val threadName = Thread.currentThread().getName (hostname, threadName, rules.rulesMap.getOrElse(word, 0), rules.toString) })
因為運算元中的變數都會被髮送到Executor端進行執行,所以Driver端new出來的物件會被髮送到Executor端進行執行的,如果Executor端有多個task的話,那麼這個例項就會被分發到多個task中,也就是每個task會儲存一個例項,這樣做會造成資源的浪費,具體細節如下圖:
方式二:自定義一個object物件,這個物件同樣要實現Serializable介面
1.自定義一個object物件,使用object修飾的話,這個類相當於java中的單例物件,只會被初始化一次。
在這裡插入程式碼片object Rules extends Serializable { val rulesMap = Map("hadoop" -> 2.7, "spark" -> 2.2) val hostname = InetAddress.getLocalHost.getHostName println(hostname + "@@@@@@@@@@@@@@@@!!!!") }
2.例項化這個單例物件
val conf = new SparkConf().setAppName("SerTest")
val sc = new SparkContext(conf)
val lines: RDD[String] = sc.textFile(args(0))
val rules = Rules
val r = lines.map(word => {
val hostname = InetAddress.getLocalHost.getHostName
val threadName = Thread.currentThread().getName
//rules的實際是在Executor中使用的
(hostname, threadName, rules.rulesMap.getOrElse(word, 0), rules.toString)
})
總結:這裡的Rules的object物件是需要被序列化的,在scala中使用object修飾的類是單例的
在類載入的時候時候就會回被初始化,也就是說,因為這裡rules是在map外面定義的,所以這個類只會被初始化一次,然後再task任務提交的時候,會見這個物件傳送到對應的Executor上,並且每個worker上的Executor上多個task公用一個rules物件,這樣做的好處是在同一個Executor中多個task可以公用一個rules物件。
方式三:前兩種方式都是在Driver端例項化這個物件的,在後期程式執行的時候會見這個例項化過的物件隨著task物件的傳送而傳送到Executor端執行,如果資料量特別大的情況下,會造成網路頻寬的消耗,最理想的方式就是直接在Executor端建立 這個例項化的物件,因為scala中object修飾的物件是單例的,所以最好的方式就是在Executor端直接呼叫這個單例物件:這裡不進行序列化的原因是因為,不需要將物件從Driver端傳送到Executor端,而是直接在Executor端生成
1.////第三種方式,希望Rules在EXecutor中被初始化(不走網路了,就不必實現序列化介面)
object Rules {
val rulesMap = Map("hadoop" -> 2.7, "spark" -> 2.2)
val hostname = InetAddress.getLocalHost.getHostName
println(hostname + "@@@@@@@@@@@@@@@@!!!!")
}
2.編寫spark程式
val conf = new SparkConf().setAppName("SerTest")
val sc = new SparkContext(conf)
val lines: RDD[String] = sc.textFile(args(0))
val r = lines.map(word => {
//函式的執行是在Executor執行的(Task中執行的)
val hostname = InetAddress.getLocalHost.getHostName
val threadName = Thread.currentThread().getName
(hostname, threadName, Rules.rulesMap.getOrElse(word, 0), Rules.toString)
})
這裡直接呼叫Rules單例物件,然後呼叫裡面的靜態方法,因為單例物件中的方法都是單例的,不需要再進行new obj ,而是直接呼叫的。這樣做的好處就是最終 Rules會隨著Task直接傳送到Executor上,初始化的時候直接在Executor端進行了。可以將規則資訊使用這種方式進行傳送到對應的Executor中。