1. 程式人生 > >apach zeppelin中使用spark streaming:基本功能

apach zeppelin中使用spark streaming:基本功能

一.需求

在使用spark-streaming 處理流式任務時,由於spark-shell需要登入到和生產叢集相連的機器開啟,使用起來也有諸多不便,且預設不支援kafka等源,所以萌生使用zeppelin 中的spark interpreter來完成streaming 任務.

二.嘗試

在網上找到了一個改版的zeppelin版wordcount例子

第一步啟動nc 監聽埠7777

第二步在zeppelin spark interpreter 中啟動streming任務

%spark

sc.version
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
case class Message(createdAt:Long, text:String)
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.socketTextStream("10.104.90.40", 7777)
val errorLines = lines.window(Seconds(10))
val message = errorLines.map(status=>
Message(System.currentTimeMillis()/1000, status)
).foreachRDD(rdd=>
if (rdd.isEmpty()==false) {
rdd.toDF().registerTempTable("message")}
)
errorLines.print
ssc.start()

第三步在nc中輸入一些測試資料,在zeppelin spark interpreter讀取streaming處理的結果

%spark
val data = sqlContext.sql("select * from message")
data.show()

得到類似如下的輸出

data: org.apache.spark.sql.DataFrame = [createdAt: bigint, text: string] +----------+----+ | createdAt|text| +----------+----+ |1493709315| asd| |1493709321| as| |1493709321| d| |1493709321| asd| |1493709315| as| |1493709315| d| |1493709321| asd| |1493709321| as| |1493709321| das| |1493709315| d| |1493709321| as| |1493709315| d| |1493709315| 1q2| |1493709321| 3| |1493709321| 12| |1493709321| 312| |1493709315| | |1493709321| 3| |1493709315| 123| |1493709321| qw| +----------+----+ only showing top 20 rows

三.問題

嘗試的過程中發現瞭如下問題:

1.停止streaming context的方式不正確

如果使用ssc.stop()停止spark streaming context 會導致zeppelin 服務端的spark context也一併停止,導致出現如下的錯誤,並且在web ui重啟spark interpreter也不解決問題,最後只能重啟整個zeppelin 服務.

java.lang.IllegalStateException: RpcEnv has been stopped

在zeppelin中停止streaming context的正確方式為

ssc.stop(stopSparkContext=false, stopGracefully=true)

2.nc server同一時間只能服務一個client

剛開始嘗試nc用法時,先使用nc -lk 7777啟動server端,再使用nc locahost 7777啟動client,嘗試互相傳送資訊成功,在zeppelin 中streaming任務卻總是讀不到任何資料

原因就在於nv server只能同時服務一個client端.

在多個client 端同時連結server 端時,按建立連結的先後會出現只有一個client埠能接收資料的情況.

如果zeppelin中的streaming任務作為client端接收到了server端的訊息,即使按照1中的方法停止了streaming context,別的client端也依然接受不到訊息,推測可能是server端服務物件仍然為streaming 任務.

3.ERROR [2017-05-02 11:31:12,686] ({dispatcher-event-loop-2} Logging.scala[logError]:70) - Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to loc

alhost:7777 - java.net.ConnectException: Connection refuse

開始時使用localhost 作為socketTextStream中的host地址,卻總是會出現上面的錯誤,但是使用telnet >open  localhost 7777卻可以成功建立連結

把localhost 改成10.104.90.40 則不會再出現上述錯誤,推測原因在於zepplin 使用的spark interpreter設定執行在了cluster mode,在除10.104.90.40 以為的機器上並沒有nc 執行,7777埠也是不通的,所以會出現訪問拒絕.

4.Attempted to use BlockRDD[36063] at socketTextStream at <console>:217 after its blocks have been removed!

當使用val data = sqlContext.sql("select * from message")檢視streaming 處理結果時,查詢後一段時間(幾s)後再次執行查詢任務就會出現如上錯誤.

在zeppelin spark interpreter日誌中發現每個視窗執行完之後都與類似如下的日誌

Removing RDD 37697 from persistence list 

這篇文章中的程式碼分析可見出錯原因在於blockrdd 已經被刪了導致,結合spark streaming dag任務結束後會刪除資料可知

這個報錯的原因在於message這張由registerTempTable 產生的表實際的儲存rdd已經被刪除導致.

5.hive接入問題,不解

INFO [2017-04-28 22:15:10,455] ({pool-2-thread-2} HiveMetaStoreClient.java[open]:376) - Trying to connect to metastore with URI thrift://10.104.90.40:9083

WARN [2017-04-28 22:15:10,456] ({pool-2-thread-2} UserGroupInformation.java[getGroupNames]:1492) - No groups available for user heyang.wang

WARN [2017-04-28 22:15:10,462] ({pool-2-thread-2} HiveMetaStoreClient.java[open]:444) - set_ugi() not successful, Likely cause: new client talking to old server. Continuing without it.

5.1假期前一度出現的問題,只要對rdd進行相關action類操作如rdd.toDf,rdd.collect等就會觸發,但是當時hive cli和beeline都可以正常訪問hive,hive metastore 日誌中也沒有報錯資訊.

但是5.1假期或卻又沒問題了.

四.嘗試在livy中執行spark-streaming任務

經過測試,在livy中可以以同樣的方式使用spark streaming,只需要改%spark為%livy.spark



五.接入kafka資料來源(未完)

參考來源: