1. 程式人生 > >Spark Streaming 自定義接收器

Spark Streaming 自定義接收器

public class JavaCustomReceiver extends Receiver<String> {

  String host = null;
  int port = -1;

  public JavaCustomReceiver(String host_ , int port_) {
    super(StorageLevel.MEMORY_AND_DISK_2());
    host = host_;
    port = port_;
  }

  public void onStart() {
    // Start the thread that receives data over a connection
    new Thread()  {
      @Override public void run() {
        receive();
      }
    }.start();
  }

  public void onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private void receive() {
    Socket socket = null;
    String userInput = null;

    try {
      // connect to the server
      socket = new Socket(host, port);

      BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));

      // Until stopped or connection broken continue reading
      while (!isStopped() && (userInput = reader.readLine()) != null) {
        System.out.println("Received data '" + userInput + "'");
        store(userInput);
      }
      reader.close();
      socket.close();

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again");
    } catch(ConnectException ce) {
      // restart if could not connect to server
      restart("Could not connect", ce);
    } catch(Throwable t) {
      // restart if there is any other error
      restart("Error receiving data", t);
    }
  }
}

在spark streaming中使用自定義的接收器 自定義接收器可用於在Spark Streaming應用中,通過使用streamingContext.receiverStream(自定義接收器的例項)。如下所示,建立一個輸入Dstream
// Assuming ssc is the JavaStreamingContext
JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { ... });
...

接收器的可靠性 正如在spark streaming程式設計指南中討論的那樣,基於接收器的可靠性和容錯語義,有兩種型別的接收器: 1 可靠的接收器:對於可靠的訊息來源,允許傳送的資料被確認,一個可靠的接收器正確地確認資料被接收器接收同時被可靠地儲存在spark中。通常,實現可靠的接收器需仔細考量訊息確認的語義。 2 不可靠的接收器:不可靠的接收器不向資料來源傳送確認資訊。它可用於不支援確認機制的資料來源,或者那些可靠的資料來源但是我們不需要其使用複雜的確認機制。 為了實現可靠的接收器,必須要使用store(multiple-records)取儲存資料。這種型別的store()一種阻塞呼叫,只有在所有給定的記錄被儲存在spark裡之後才返回。
如果接收器的配置儲存級別使用複製(預設啟用),則複製完成後這個呼叫返回。因此,它確保了資料被可靠的儲存,和接收器可以現在正確地確認訊息;在複製資料的中間過程中接收器失敗了,這將確保沒有資料丟失(緩衝資料沒有被確認,資料來源將會重新發送)。
不可靠的接收器沒有實現這種邏輯,它可以簡單地從資料來源接收記錄並使用store(single-record)將它們插入(one-at-a-time )。它沒有store(multiple-records)那樣的可靠保證,其優點如下:
  • 系統考慮到將資料轉化為適當大小的塊;
  • 系統考慮到控制接收速率,如果速率限制已指定;
  • 不可靠接收器比可靠接收器更加容易實現;
原文地址:https://spark.apache.org/docs/latest/streaming-custom-receivers.html