1. 程式人生 > >第一個Spark Streaming案例程式

第一個Spark Streaming案例程式

前面的文章大概的介紹了Spark Streaing流式處理框架,說的通俗點,實際上就是在Spark Core的基礎上進行了封裝,然後將小批次的資料進行處理,處理完了程序並不會停止,而是會一直存在,這樣只要有資料進來,就會進行處理,從而實現了流式處理

下面就來一個例項進行感受:

這裡選擇使用Linux下的nc工具來產生socket資料,然後Spark Streaming來進行處理

1.首先放出Spark Streaing的程式碼:

package cn.ysjh

import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}


object SparkStreamingTest {


  def main(args: Array[String]): Unit = {

    //實時計算要建立StreamingContext,StreamingContext是對SparkContext的封裝,
     val conf: SparkConf = new SparkConf().setAppName("SparkStreaming").setMaster("local[4]")
    val context: SparkContext = new SparkContext(conf)

    //第二個引數是小批次產生的時間間隔,Milliseconds是毫秒
    val streaming: StreamingContext = new StreamingContext(context,Milliseconds(5000))

    //有了SparkContext就可以建立SparkStreaming的抽象DStream
    //從一個socket埠中讀取資料
    val lines: ReceiverInputDStream[String] = streaming.socketTextStream("192.168.220.134",6789)

    //對DStream進行操作,你操作這個抽象(代理,描述),就像操作一個本地集合一樣
     val words: DStream[String] = lines.flatMap(_.split(" "))
    val wordOne: DStream[(String, Int)] = words.map((_,1))
    val result: DStream[(String, Int)] = wordOne.reduceByKey(_+_)

    //列印結果
    result.print()


    //啟動Spark Streaming程式
    streaming.start()
    //等待優雅的退出
    streaming.awaitTermination()
  }
}

2.在一臺虛擬機器中安裝nc,在實際的系統中安裝nc的命令不同

Ubuntu:

apt-get install netcat

Centos:

yum install nc

3.在IDEA中執行Spark Streaming程式

4.在安裝了nc工具包的虛擬機器上輸入:

nc -lk 6789

然後隨便輸入資料,以空格隔開,就會在IDEA中看到執行的結果了,只要輸入一條資料就會計算統計出一次結果,而且是每隔5秒輸出一次

執行結果示例:

可以很明確的看出Spark Streaming就是一直在執行的Spark程式