第102講: 動手實戰Spark Streaming自定義Receiver並進行除錯和測試
有興趣想學習國內整套Spark+Spark Streaming+Machine learning頂級課程的,可加我qq
471186150。共享視訊,價效比超高!
1:SparkStreaming雖然說已經支援了很多不同型別的資料來源。但是有時候可能我們的一些資料來源非常特殊 ,不是它天然預設支援的,這時候就要自定義Receiver。而自定義Receiver,一般都是基於網路的方式。因為你傳資料的話,一般是從另外一個網路埠傳過來,至於傳的協議是另外一碼事。
2:從本質上來說,SparkStreaming中的所有Receiver,都是自定義的Receiver。所以你要想自定義一個Receiver,最最簡單的方式,你就是看下已有的Receiver怎麼去實現。
具體步驟:http://spark.apache.org/docs/latest/streaming-custom-receivers.html
class CustomReceiver(host: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver" ) {
override def run() { receive() }//開啟執行緒調receiver()方法
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// Connect to host:port,Receiver的時候就連上Socket
socket = new Socket(host, port)
// Until stopped or connection broken continue reading
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()//每讀一行,存一次,一直迴圈
}
reader.close()
socket.close()
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again")//先stop,然後再start()
} catch {
case e: java.net.ConnectException =>
// restart if could not connect to server
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
}
上面就已經自定義完了一個Receiver,下面就new出它的物件,傳進去。因為返回的是Dstream,以前對Dstream怎麼操作,繼續怎麼操作,這裡先從flatMap開始。
// Assuming ssc is the StreamingContext
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
val words = lines.flatMap(_.split(" "))
...
相關推薦
第102講: 動手實戰Spark Streaming自定義Receiver並進行除錯和測試
有興趣想學習國內整套Spark+Spark Streaming+Machine learning頂級課程的,可加我qq 471186150。共享視訊,價效比超高! 1:SparkStreaming雖然說已經支援了很多不同型別的資料來源。但是有時候可能我們的一些資料來源非
Spark Streaming 自定義接收器
public class JavaCustomReceiver extends Receiver<String> { String host = null; int port = -1; public JavaCustomReceiver(String host_ , int po
第127講:Hadoop叢集管理之安全模式解析及動手實戰學習筆記
第127講:Hadoop叢集管理之安全模式解析及動手實戰學習筆記 hadoop在啟動時namenode會把fsimage載入進記憶體,同時和edits內容合併,以此建立整個檔案系統的元資料的映象(記憶體級別),所以客戶端可以通過namenode訪問檔案系統的資訊。完成後變成
《零基礎入門學習Python》第054講:論一隻爬蟲的自我修養2:實戰
目錄 0. 請寫下這一節課你學習到的內容:格式不限,回憶並複述是加強記憶的好方式! 測試題 0. urlopen() 方法的 timeout 引數用於設定什麼? 1. 如何從 urlopen() 返回的物件中獲取 HTTP 狀態碼? 2. 在客戶端和伺服器之間進行請求-響應時
第014講:Scala中Map和HashMap原始碼剖析及程式碼實踐(從1000個程式碼案例中學習人工智慧和大資料實戰)
第014講:Scala中Map和HashMap原始碼剖析及程式碼實踐/** * A generic trait for immutable maps. Concrete classes have to provide * functionality for the abs
第1講:一週學會linux實戰(第一天)基礎介紹
一 linux特點:1 免費、 開源、2 支援多執行緒,多使用者3 安全性好4 對記憶體和檔案的管理非常好。缺點:操作起來相對麻煩些。二:1960年,電腦大且貴,MIT開發能容納30人使用的分時作業系統;1965年,MIT,GE,BELL制定火星計劃,使電腦能容納300同時使
Spark定製班第29課:深入理解Spark 2.x中的Structured Streaming內幕
本期內容: 1. 新型的Spark Streaming思維 2. Structured Streaming內幕 Spark 2.0 仍有bug,不適合於生成環境。只用於測試。 Spark 2.X提出了continuous application(連續的應用程式)的概念,非
第102天:CSS3實現立方體旋轉
right https mes abs absolute 變換 class type auto CSS3實現立方體旋轉 1 <!DOCTYPE html> 2 <html lang="en"> 3 <head> 4 &l
第 011講:一個打了激素的數組[02]
nbsp img inf bubuko ice slice 知識 http bsp slice: member[1:3] 課後作業: 0. 2,9,71. list1[0]= 1list1[0:1] = list1[1] 2. 方法一: >>
Kafka:ZK+Kafka+Spark Streaming集群環境搭建(三)安裝spark2.2.1
node word clas 執行 選擇 dir clust 用戶名 uil 如何配置centos虛擬機請參考《Kafka:ZK+Kafka+Spark Streaming集群環境搭建(一)VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。》 如
Kafka:ZK+Kafka+Spark Streaming集群環境搭建(九)安裝kafka_2.11-1.1.0
itl CA blog tor line cat pre PE atan 如何搭建配置centos虛擬機請參考《Kafka:ZK+Kafka+Spark Streaming集群環境搭建(一)VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。》 如
Kafka:ZK+Kafka+Spark Streaming集群環境搭建(二)VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。
centos 失敗 sco pan html top n 而且 div href Centos7出現異常:Failed to start LSB: Bring up/down networking. 按照《Kafka:ZK+Kafka+Spark Streaming集群環
Kafka:ZK+Kafka+Spark Streaming集群環境搭建(十三)定義一個avro schema使用comsumer發送avro字符流,producer接受avro字符流並解析
finall ges records ring ack i++ 一個 lan cde 參考《在Kafka中使用Avro編碼消息:Consumer篇》、《在Kafka中使用Avro編碼消息:Producter篇》 pom.xml <depende
Kafka:ZK+Kafka+Spark Streaming集群環境搭建(十七)待整理
lan post -a 客戶端 客戶 struct bsp www get redis按照正則批量刪除key redis客戶端--jedis 在Spark結構化流readStream、writeStream 輸入輸出,及過程ETL Spark Structur
Kafka:ZK+Kafka+Spark Streaming集群環境搭建(十九)待整理
set dstream 搭建 details 編程指南 .com .cn csdn read redis按照正則批量刪除key redis客戶端--jedis 在Spark結構化流readStream、writeStream 輸入輸出,及過程ETL Spark St
學習筆記-小甲魚Python3學習第四講:改進我們的小遊戲
import lazy 打印 變量 lua while語句 表達式 val 測試題 測試題0.請問以下代碼會打印多少次“我愛魚C”?while 'C': print('我愛魚C')當while語句中條件為真的時候,會無限循環下去。所以“
學習筆記-小甲魚Python3學習第六講:python之常用操作符
mar 邏輯 .... 運算操作 == 整數 image 臺階 size 常用操作符運算操作符:加+ 減- 乘* 除/ 余% 冪運算** 地板除//比較操作符: < ,> ,<=,>=,==,!=邏輯操作符: and,or,not優先級:冪運算符有點
學習筆記-小甲魚Python3學習第七講:了不起的分支和循環
退出 中心 用戶 鼠標位置 == 是否 淡出 true 循環 打飛機框架加載背景音樂播放背景音樂(設置單曲循環)我方飛機產生interval = 0while True: if 用戶是否電擊關閉遊戲窗口按鈕: 退出遊戲 interval += 1
學習筆記-小甲魚Python3學習第八講:了不起的分支和循環2
false 成績 pytho 問題 成員 報錯 nsh abc 語法 按照100分制,90分以上成績為A,80到90為B,60到80為C,60以下為D,寫一個程序,當用戶輸入分數,自動轉換為ABCD的形式打印。使用if 條件:...elif 條件:...else...循環f
學習筆記-小甲魚Python3學習第九講:了不起的分支和循環3
接收 實現 舉例 默認值 app 立方和 課後作業 bsp swe while循環:當條件真時,執行循環體while 條件: 循環體for循環:for 目標 in 表達式: 循環體舉例:>>> fruits = ['apple'