spark 基礎七 spark streaming
spark streaming 使用離散化流DStream作為抽象表示。DStream是隨時間退役而受到的資料的序列,在內部,每個時間區間收到的資料都作為RDD存在,DStream時由這些RDD所組成的序列。DStream支援兩種操作:轉化操作生成新的DStream,輸出操作將資料寫入外部系統。除了提供RDD類似的操作外,還增加了與時間相關的新操作。
與批處理程式不同,streaming需要進行額外的配置保證不間斷工作,即checkpointing機制。
#scala流式篩選
# 從sparkconf建立streamingcontext並指定1s的批處理大小
val ssc = new StreamingContext(conf,Seconds(1))
#連線到本地機器7777埠上後,使用受到的資料建立Dstream
val lines = ssc.socketTextStream("localhost",7777)
#從dstream中篩選出包含字串error的行
val errorlines=lines.filter(_.contains("error'))
errorlines.print()
#java流式篩選
JStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(1));
JavaDStream<String> lines=jssc.socketTextStream("localhost",7777);
JavaDstream<String> errorlines=lines.filter(new Function<String,Boolean>(){
public Boolean call(String line){
return line.contains("error");
}
});
errorLInes.spint();
以上只是設定號了要進行的計算,系統受到資料時計算就會開始。要開始接受資料,需要顯式的呼叫streamingcontext的start方法,執行會在另一個執行緒中進行,需要呼叫awaitTermination來等待流計算完成。
spark streaming 使用微批次的建構,把流式計算當作一系列連續的小規模批處理來對待。每個時間區間內的資料生成一個RDD,DStream是一個RDD序列。
轉化操作
DStream 的轉化操作可以分為無狀態和有狀態兩種
無狀態轉化操作中,每個批次的處理不依賴於之前批次的資料,如map、filter、reducebykey等,會分別應用到DStream中的每個RDD上,如ruducebykey會歸約每個時間區間內的資料但不會歸約不同區間的資料,而有狀態轉化操作會整合不同區間內的資料。
另外提供了transfrom,允許對DStream提供任意一個RDD到RDD的函式,這個函式會在資料流中的每個批次中被呼叫
有狀態轉化操作中,轉化操作需要使用之前批次的資料或者中間結果來計算當前批次的資料,包括基於滑動視窗的轉化操作和追蹤狀態變化的轉化操作。
基於視窗的轉化操作 會在一個比streamingcontext的批次間隔更長的時間範圍內,通過整合多個批次的結果計算出整個視窗的結果,需提供視窗時長和滑動步長,必須為批次間隔的整數倍,視窗時長控制每次計算最近的多少個批次的資料,滑動步長預設值與批次間隔相等,用來控制對新的DSream進行計算的間隔
最常用的window,返回一個新的DStream來表示所請求的視窗操作的結果資料,其生成的DStream中的每個RDD都包含多個批次中資料
#scala
val awindows=adstream.window(seconds(30),(10))
val windowscount=awindows.count()
#java
JavaDStream<ApacheAccessLog> awind=adstream.window(Durations.seconds(30),Durations.seconds(10));
JavaDStream<Integer>windowcounts=awindow.count();
還提供更高效的視窗操作,如 reduceByWindow()和reduceBYKeyAndWindow(),接收歸約函式,在整個視窗上執行。還有一種特殊形式,只考慮新進入視窗的資料和離開視窗的資料,讓spark實現增量計算歸約結果,這種特殊形式需要提供歸約函式的一個逆函式
#scala
val ipdstream = alongdstream.map(logentry=>(logentry.getIpAddress(),1))
val ipcountdstream = ipdstream.reduceByKeyAndWindow(
{(x,y)=>x+y};
{(x,y)=>x-y},
seconds(30),
seconds(10))
#Java
class extractip extends PairFunction<ApacheAccessLog, String,Long>{
public Tuple2<String,Long> call(ApacheAccessLog entry){
return new Tuple2(entry.getIpAddress(),1l);}
}
class addlongs extends Function2<Long,Long,Long>(){
public Long call(Long v1, Long v2){return v1+v2};
}
class substrclong extends Function<Long,Long,Long>(){
public Long call(Long v1, Long v2) {return v1-v2;}
}
JavaPairDStream<String,Long> ipaddressPairDStream=accesslongDStream.mapToPair(new extractIp());
JavaPairDStream<String,Long> ipcountDstream=ipaddressPairDStream.reudceByKeyAndWIndow(
new addlongs(),new subtractlongs(), Durations.seconds(30),Durations.seconds(10));
countBywindow() 返回每個視窗中元素個數的DStream,countByValueAndWindow()返回視窗中每個值的個數
updateStateByKey轉化操作
提供對一個狀態變數的訪問,用於鍵值對形式的DStream。給定一個由(鍵,事件)對構成的DStream,並傳遞一個指定如何根據新的時間更新每個鍵對應狀態的函式,可以構建出新的DStream,其內部資料為(鍵,狀態)對
#scala使用updatestatebykey()跟蹤日誌訊息中各HTTP響應程式碼的技術,鍵時響應程式碼,狀態是代表各響應程式碼計數的證書,事件則是頁面訪問
def updateRunningSum(values:Seq[Long],state:Option[Long])={
Some(state.getOrElse(0L)+VALUES.SIZE)
}
val responseCodeDStream = accesslongDStream.map(log=>(log.getResponseCode(),1L))
val responseCodeCountDStream=responseCodeDStream.updateStateByKey(updateRunningSUm_)
#java
class UpdataRunningSUm implements Function2<List<LOng>,Optional<Long>,Optional<Long>>{
public Optional<Long> call(List<LOng> nums, Optional<Long> current){
long sum= current.or(0L);
return Optional.of(sum +nums.size());
}};
JavaPairDStream<Integer,Long> responseCodeCountDStream= accesslongsDStream.maoToPaie(new PairFunction<ApacheAccessLog, Integer,Long>(){
public Tuple2<Integer,Long>call(ApacheAccessLog log){
return new Tuple2( log.getResponseCode(),1L);
}}).UpdateStateByKey(new UpdateRunningSum());
輸出操作
指定了對流資料經轉化操作得到的資料所要執行的操作,如把結果推入外部資料庫或輸出到螢幕上,與RDD的惰性求值類似,如果一個DStream及其派生出的DStream沒有被執行輸出操作,那這些DStream就都不會被求值
print、save等
#java中將DStream儲存為sequencefile
JavaPairDStream<Text,LongWritable> writableDStream = ipDStream.mapToPair(
new PairFunction<TUple2<String,Long>,Text,LongWritable>(){
public Tuple2<Text,LongWritable>call(Tuple2<String,Long>e){
return new Tuple2(new Text(e.-1()),new LongWritable(e._2()))
}
});
class OutFormat extends SequenceFileOutputFormat<Text,LongWritable>{};
writableDStream.saveAsHadoopFile("outputDir","txt',Text.calss,LongWritable.class,OutFormat.class);
通用的輸出操作 foreachRDD(),用來對DStream中的RDD執行任意計算,在fireachRDD中可以使用我們在spark中實現的所有行動操作。
#scala 使用foreachRDD將資料儲存到外部系統中
ipaddressrequenstcount.foreachRDD{RDD=>
rdd.foreachPartition{partition =>
//開啟到儲存系統的連線
paitition.foreach{item =>
//使用連線把item存到系統中
}
關閉連線
}
}
輸入源
檔案流
val longData = ssc.textFileStream(longDirectory)
JavaDStram<String> logData=jssc.textFileStream(logsDirectory);
除了文字資料,也可以讀如任意Hadoop輸入格式,只需將key、value、InputFormat類提供給spark streaming即可
ssc.fileStream[LongWritable,IntWritable,SequenceFileInputFormat[LOngWritable,IntWritable]](inputDirectory).map{
case(x,y)=>(x.get(),y.get())}