Spark Streaming 自定義接收器
阿新 • • 發佈:2019-01-03
public class JavaCustomReceiver extends Receiver<String> { String host = null; int port = -1; public JavaCustomReceiver(String host_ , int port_) { super(StorageLevel.MEMORY_AND_DISK_2()); host = host_; port = port_; } public void onStart() { // Start the thread that receives data over a connection new Thread() { @Override public void run() { receive(); } }.start(); } public void onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself if isStopped() returns false } /** Create a socket connection and receive data until receiver is stopped */ private void receive() { Socket socket = null; String userInput = null; try { // connect to the server socket = new Socket(host, port); BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); // Until stopped or connection broken continue reading while (!isStopped() && (userInput = reader.readLine()) != null) { System.out.println("Received data '" + userInput + "'"); store(userInput); } reader.close(); socket.close(); // Restart in an attempt to connect again when server is active again restart("Trying to connect again"); } catch(ConnectException ce) { // restart if could not connect to server restart("Could not connect", ce); } catch(Throwable t) { // restart if there is any other error restart("Error receiving data", t); } } }
在spark streaming中使用自定義的接收器 自定義接收器可用於在Spark Streaming應用中,通過使用streamingContext.receiverStream(自定義接收器的例項)。如下所示,建立一個輸入Dstream
// Assuming ssc is the JavaStreamingContext JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port)); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { ... }); ...
接收器的可靠性 正如在spark streaming程式設計指南中討論的那樣,基於接收器的可靠性和容錯語義,有兩種型別的接收器: 1 可靠的接收器:對於可靠的訊息來源,允許傳送的資料被確認,一個可靠的接收器正確地確認資料被接收器接收同時被可靠地儲存在spark中。通常,實現可靠的接收器需仔細考量訊息確認的語義。 2 不可靠的接收器:不可靠的接收器不向資料來源傳送確認資訊。它可用於不支援確認機制的資料來源,或者那些可靠的資料來源但是我們不需要其使用複雜的確認機制。 為了實現可靠的接收器,必須要使用
store(multiple-records)
取儲存資料。這種型別的store()是一種阻塞呼叫,只有在所有給定的記錄被儲存在spark裡之後才返回。- 系統考慮到將資料轉化為適當大小的塊;
- 系統考慮到控制接收速率,如果速率限制已指定;
- 不可靠接收器比可靠接收器更加容易實現;