apach zeppelin中使用spark streaming:基本功能
一.需求
在使用spark-streaming 處理流式任務時,由於spark-shell需要登入到和生產叢集相連的機器開啟,使用起來也有諸多不便,且預設不支援kafka等源,所以萌生使用zeppelin 中的spark interpreter來完成streaming 任務.
二.嘗試
在網上找到了一個改版的zeppelin版wordcount例子.
第一步啟動nc 監聽埠7777
第二步在zeppelin spark interpreter 中啟動streming任務
%spark
sc.version
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
case class Message(createdAt:Long, text:String)
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.socketTextStream("10.104.90.40", 7777)
val errorLines = lines.window(Seconds(10))
val message = errorLines.map(status=>
Message(System.currentTimeMillis()/1000, status)
).foreachRDD(rdd=>
if (rdd.isEmpty()==false) {
rdd.toDF().registerTempTable("message")}
)
errorLines.print
ssc.start()
第三步在nc中輸入一些測試資料,在zeppelin spark interpreter讀取streaming處理的結果
%spark
val data = sqlContext.sql("select * from message")
data.show()
得到類似如下的輸出
data: org.apache.spark.sql.DataFrame = [createdAt: bigint, text: string] +----------+----+ | createdAt|text| +----------+----+ |1493709315| asd| |1493709321| as| |1493709321| d| |1493709321| asd| |1493709315| as| |1493709315| d| |1493709321| asd| |1493709321| as| |1493709321| das| |1493709315| d| |1493709321| as| |1493709315| d| |1493709315| 1q2| |1493709321| 3| |1493709321| 12| |1493709321| 312| |1493709315| | |1493709321| 3| |1493709315| 123| |1493709321| qw| +----------+----+ only showing top 20 rows
三.問題
嘗試的過程中發現瞭如下問題:
1.停止streaming context的方式不正確
如果使用ssc.stop()停止spark streaming context 會導致zeppelin 服務端的spark context也一併停止,導致出現如下的錯誤,並且在web ui重啟spark interpreter也不解決問題,最後只能重啟整個zeppelin 服務.
java.lang.IllegalStateException: RpcEnv has been stopped
在zeppelin中停止streaming context的正確方式為
ssc.stop(stopSparkContext=false, stopGracefully=true)
2.nc server同一時間只能服務一個client
剛開始嘗試nc用法時,先使用nc -lk 7777啟動server端,再使用nc locahost 7777啟動client,嘗試互相傳送資訊成功,在zeppelin 中streaming任務卻總是讀不到任何資料
原因就在於nv server只能同時服務一個client端.
在多個client 端同時連結server 端時,按建立連結的先後會出現只有一個client埠能接收資料的情況.
如果zeppelin中的streaming任務作為client端接收到了server端的訊息,即使按照1中的方法停止了streaming context,別的client端也依然接受不到訊息,推測可能是server端服務物件仍然為streaming 任務.
3.ERROR [2017-05-02 11:31:12,686] ({dispatcher-event-loop-2} Logging.scala[logError]:70) - Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to loc
alhost:7777 - java.net.ConnectException: Connection refuse
開始時使用localhost 作為socketTextStream中的host地址,卻總是會出現上面的錯誤,但是使用telnet >open localhost 7777卻可以成功建立連結
把localhost 改成10.104.90.40 則不會再出現上述錯誤,推測原因在於zepplin 使用的spark interpreter設定執行在了cluster mode,在除10.104.90.40 以為的機器上並沒有nc 執行,7777埠也是不通的,所以會出現訪問拒絕.
4.Attempted to use BlockRDD[36063] at socketTextStream at <console>:217 after its blocks have been removed!
當使用val data = sqlContext.sql("select * from message")檢視streaming 處理結果時,查詢後一段時間(幾s)後再次執行查詢任務就會出現如上錯誤.
在zeppelin spark interpreter日誌中發現每個視窗執行完之後都與類似如下的日誌
Removing RDD 37697 from persistence list
從這篇文章中的程式碼分析可見出錯原因在於blockrdd 已經被刪了導致,結合spark streaming dag任務結束後會刪除資料可知
這個報錯的原因在於message這張由registerTempTable 產生的表實際的儲存rdd已經被刪除導致.
5.hive接入問題,不解
INFO [2017-04-28 22:15:10,455] ({pool-2-thread-2} HiveMetaStoreClient.java[open]:376) - Trying to connect to metastore with URI thrift://10.104.90.40:9083
WARN [2017-04-28 22:15:10,456] ({pool-2-thread-2} UserGroupInformation.java[getGroupNames]:1492) - No groups available for user heyang.wang
WARN [2017-04-28 22:15:10,462] ({pool-2-thread-2} HiveMetaStoreClient.java[open]:444) - set_ugi() not successful, Likely cause: new client talking to old server. Continuing without it.
5.1假期前一度出現的問題,只要對rdd進行相關action類操作如rdd.toDf,rdd.collect等就會觸發,但是當時hive cli和beeline都可以正常訪問hive,hive metastore 日誌中也沒有報錯資訊.
但是5.1假期或卻又沒問題了.
四.嘗試在livy中執行spark-streaming任務
經過測試,在livy中可以以同樣的方式使用spark streaming,只需要改%spark為%livy.spark
五.接入kafka資料來源(未完)
參考來源: