1. 程式人生 > >第102講: 動手實戰Spark Streaming自定義Receiver並進行除錯和測試

第102講: 動手實戰Spark Streaming自定義Receiver並進行除錯和測試

有興趣想學習國內整套Spark+Spark Streaming+Machine learning頂級課程的,可加我qq  471186150。共享視訊,價效比超高!

1:SparkStreaming雖然說已經支援了很多不同型別的資料來源。但是有時候可能我們的一些資料來源非常特殊 ,不是它天然預設支援的,這時候就要自定義Receiver。而自定義Receiver,一般都是基於網路的方式。因為你傳資料的話,一般是從另外一個網路埠傳過來,至於傳的協議是另外一碼事。

2:從本質上來說,SparkStreaming中的所有Receiver,都是自定義的Receiver。所以你要想自定義一個Receiver,最最簡單的方式,你就是看下已有的Receiver怎麼去實現。

具體步驟:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

class CustomReceiver(host: String, port: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver"
) { override def run() { receive() }//開啟執行緒調receiver()方法 }.start() } def onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself if isStopped() returns false } /** Create a socket connection and receive data until receiver is stopped */
private def receive() { var socket: Socket = null var userInput: String = null try { // Connect to host:port,Receiver的時候就連上Socket socket = new Socket(host, port) // Until stopped or connection broken continue reading val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")) userInput = reader.readLine() while(!isStopped && userInput != null) { store(userInput) userInput = reader.readLine()//每讀一行,存一次,一直迴圈 } reader.close() socket.close() // Restart in an attempt to connect again when server is active again restart("Trying to connect again")//先stop,然後再start() } catch { case e: java.net.ConnectException => // restart if could not connect to server restart("Error connecting to " + host + ":" + port, e) case t: Throwable => // restart if there is any other error restart("Error receiving data", t) } } }
上面就已經自定義完了一個Receiver,下面就new出它的物件,傳進去。因為返回的是Dstream,以前對Dstream怎麼操作,繼續怎麼操作,這裡先從flatMap開始。
// Assuming ssc is the StreamingContext
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
val words = lines.flatMap(_.split(" "))
...

相關推薦

102 動手實戰Spark Streaming定義Receiver進行除錯測試

有興趣想學習國內整套Spark+Spark Streaming+Machine learning頂級課程的,可加我qq  471186150。共享視訊,價效比超高! 1:SparkStreaming雖然說已經支援了很多不同型別的資料來源。但是有時候可能我們的一些資料來源非

Spark Streaming 定義接收器

public class JavaCustomReceiver extends Receiver<String> { String host = null; int port = -1; public JavaCustomReceiver(String host_ , int po

127Hadoop叢集管理之安全模式解析及動手實戰學習筆記

第127講:Hadoop叢集管理之安全模式解析及動手實戰學習筆記 hadoop在啟動時namenode會把fsimage載入進記憶體,同時和edits內容合併,以此建立整個檔案系統的元資料的映象(記憶體級別),所以客戶端可以通過namenode訪問檔案系統的資訊。完成後變成

《零基礎入門學習Python》054論一隻爬蟲的自我修養2實戰

目錄 0. 請寫下這一節課你學習到的內容:格式不限,回憶並複述是加強記憶的好方式! 測試題 0. urlopen() 方法的 timeout 引數用於設定什麼? 1. 如何從 urlopen() 返回的物件中獲取 HTTP 狀態碼? 2. 在客戶端和伺服器之間進行請求-響應時

014Scala中MapHashMap原始碼剖析及程式碼實踐(從1000個程式碼案例中學習人工智慧大資料實戰)

第014講:Scala中Map和HashMap原始碼剖析及程式碼實踐/** * A generic trait for immutable maps. Concrete classes have to provide * functionality for the abs

1一週學會linux實戰(第一天)基礎介紹

一 linux特點:1 免費、 開源、2 支援多執行緒,多使用者3 安全性好4 對記憶體和檔案的管理非常好。缺點:操作起來相對麻煩些。二:1960年,電腦大且貴,MIT開發能容納30人使用的分時作業系統;1965年,MIT,GE,BELL制定火星計劃,使電腦能容納300同時使

Spark定製班29課深入理解Spark 2.x中的Structured Streaming內幕

本期內容: 1. 新型的Spark Streaming思維 2. Structured Streaming內幕 Spark 2.0 仍有bug,不適合於生成環境。只用於測試。 Spark 2.X提出了continuous application(連續的應用程式)的概念,非

102CSS3實現立方體旋轉

right https mes abs absolute 變換 class type auto CSS3實現立方體旋轉 1 <!DOCTYPE html> 2 <html lang="en"> 3 <head> 4 &l

011一個打了激素的數組[02]

nbsp img inf bubuko ice slice 知識 http bsp slice: member[1:3] 課後作業: 0. 2,9,71. list1[0]= 1list1[0:1] = list1[1] 2. 方法一: >>

KafkaZK+Kafka+Spark Streaming集群環境搭建(三)安裝spark2.2.1

node word clas 執行 選擇 dir clust 用戶名 uil 如何配置centos虛擬機請參考《Kafka:ZK+Kafka+Spark Streaming集群環境搭建(一)VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。》 如

KafkaZK+Kafka+Spark Streaming集群環境搭建(九)安裝kafka_2.11-1.1.0

itl CA blog tor line cat pre PE atan 如何搭建配置centos虛擬機請參考《Kafka:ZK+Kafka+Spark Streaming集群環境搭建(一)VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。》 如

KafkaZK+Kafka+Spark Streaming集群環境搭建(二)VMW安裝四臺CentOS,實現本機與它們能交互,虛擬機內部實現可以上網。

centos 失敗 sco pan html top n 而且 div href Centos7出現異常:Failed to start LSB: Bring up/down networking. 按照《Kafka:ZK+Kafka+Spark Streaming集群環

KafkaZK+Kafka+Spark Streaming集群環境搭建(十三)定義一個avro schema使用comsumer發送avro字符流,producer接受avro字符流解析

finall ges records ring ack i++ 一個 lan cde 參考《在Kafka中使用Avro編碼消息:Consumer篇》、《在Kafka中使用Avro編碼消息:Producter篇》 pom.xml <depende

KafkaZK+Kafka+Spark Streaming集群環境搭建(十七)待整理

lan post -a 客戶端 客戶 struct bsp www get redis按照正則批量刪除key redis客戶端--jedis 在Spark結構化流readStream、writeStream 輸入輸出,及過程ETL Spark Structur

KafkaZK+Kafka+Spark Streaming集群環境搭建(十九)待整理

set dstream 搭建 details 編程指南 .com .cn csdn read redis按照正則批量刪除key redis客戶端--jedis 在Spark結構化流readStream、writeStream 輸入輸出,及過程ETL Spark St

學習筆記-小甲魚Python3學習改進我們的小遊戲

import lazy 打印 變量 lua while語句 表達式 val 測試題 測試題0.請問以下代碼會打印多少次“我愛魚C”?while 'C': print('我愛魚C')當while語句中條件為真的時候,會無限循環下去。所以“

學習筆記-小甲魚Python3學習python之常用操作符

mar 邏輯 .... 運算操作 == 整數 image 臺階 size 常用操作符運算操作符:加+ 減- 乘* 除/ 余% 冪運算** 地板除//比較操作符: < ,> ,<=,>=,==,!=邏輯操作符: and,or,not優先級:冪運算符有點

學習筆記-小甲魚Python3學習了不起的分支循環

退出 中心 用戶 鼠標位置 == 是否 淡出 true 循環 打飛機框架加載背景音樂播放背景音樂(設置單曲循環)我方飛機產生interval = 0while True: if 用戶是否電擊關閉遊戲窗口按鈕: 退出遊戲 interval += 1

學習筆記-小甲魚Python3學習了不起的分支循環2

false 成績 pytho 問題 成員 報錯 nsh abc 語法 按照100分制,90分以上成績為A,80到90為B,60到80為C,60以下為D,寫一個程序,當用戶輸入分數,自動轉換為ABCD的形式打印。使用if 條件:...elif 條件:...else...循環f

學習筆記-小甲魚Python3學習了不起的分支循環3

接收 實現 舉例 默認值 app 立方和 課後作業 bsp swe while循環:當條件真時,執行循環體while 條件: 循環體for循環:for 目標 in 表達式: 循環體舉例:>>> fruits = ['apple'