Flink實戰(101):雙流join(三)雙流中實現Inner Join、Left Join與Right Join
阿新 • • 發佈:2020-12-04
來源:https://mp.weixin.qq.com/s/mO2h_HNzx2rwFnOXlRDAJQ
簡介
之前文章中提到JoinedStream與CoGroupedStream,例如下列程式碼:
dataStream.join(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply { ... }
由於joinedStream與coGroupedStream來自於一個特定的window,且在一個關聯上的key中實現,因此,Flink中的雙流join一定是在一個基於Inner Join的key的前提下的操作。
雙流Join中的Inner、Left、Right Join操作,實際上是指在特定的window範圍內的join。即Join的主體是window範圍,如果視窗內都沒有資料,則不發生join。
具體實現
這裡我通過2個Socket接收資料,模擬雙流,共3個引數,程式碼如下:
if (args.length != 3) { System.err.println("USAGE:\nSocketTextStreamJoinType <hostname> <port1> <port2>") return } val hostName = args(0) val port1= args(1).toInt val port2 = args(2).toInt
接下來,我們建立了2個case class,來模擬2個socket的輸入流資料,程式碼如下:
case class StockTransaction(tx_time:String, tx_code:String,tx_value:Double) case class StockSnapshot(md_time:String, md_code:String,md_value:Double)
最後要注意的地方就是如何實現Inner Join、Left Join與Right Join了。這裡採用coGroup方式,通過對coGroupFunction中的2個Iterable集合判斷是否為空來實現,例如:
if(scalaT1.nonEmpty && scalaT2.nonEmpty){ for(transaction <- scalaT1){ for(snapshot <- scalaT2){ out.collect(transaction.tx_code,transaction.tx_time, snapshot.md_time,transaction.tx_value,snapshot.md_value,"Inner Join Test") } } } }
完整的程式碼示例
package wikiedits import java.text.SimpleDateFormat import org.apache.flink.api.common.functions.CoGroupFunction import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector object InnerLeftRightJoinTest { // ************************************************************************* // PROGRAM // ************************************************************************* def main(args : Array[String]) : Unit ={ if (args.length != 3) { System.err.println("USAGE:\nSocketTextStreamJoinType <hostname> <port1> <port2>") return } val hostName = args(0) val port1 = args(1).toInt val port2 = args(2).toInt /** * 獲取執行環境以及TimeCharacteristic */ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val dataStream1 = env.socketTextStream(hostName, port1) val dataStream2 = env.socketTextStream(hostName, port2) val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") /** * operator操作 * 資料格式如下: * TX:2016-07-28 13:00:01.000,000002,10.2 * MD: 2016-07-28 13:00:00.000,000002,10.1 * 這裡由於是測試,固水位線採用升序(即資料的Event Time本身是升序輸入的) */ val dataStreamMap1 = dataStream1.map(f => { val tokens1 = f.split(",") StockTransaction(tokens1(0), tokens1(1), tokens1(2).toDouble) }) .assignAscendingTimestamps(f => format.parse(f.tx_time).getTime) val dataStreamMap2 = dataStream2.map(f => { val tokens2 = f.split(",") StockSnapshot(tokens2(0), tokens2(1), tokens2(2).toDouble) }) .assignAscendingTimestamps(f => format.parse(f.md_time).getTime) /** * Join操作 * 限定範圍是3秒鐘的Event Time視窗 */ val joinedStream = dataStreamMap1 .coGroup(dataStreamMap2) .where(_.tx_code) .equalTo(_.md_code) .window(TumblingEventTimeWindows.of(Time.seconds(3))) val innerJoinedStream = joinedStream.apply(new InnerJoinFunction) val leftJoinedStream = joinedStream.apply(new LeftJoinFunction) val rightJoinedStream = joinedStream.apply(new RightJoinFunction) innerJoinedStream.name("InnerJoinedStream").print() leftJoinedStream.name("LeftJoinedStream").print() rightJoinedStream.name("RightJoinedStream").print() env.execute("3 Type of Double Stream Join") } // ************************************************************************* // USER FUNCTIONS // ************************************************************************* case class StockTransaction(tx_time:String, tx_code:String,tx_value:Double) case class StockSnapshot(md_time:String, md_code:String,md_value:Double) class InnerJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)]{ override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double,String)]): Unit = { /** * 將Java中的Iterable物件轉換為Scala的Iterable * scala的集合操作效率高,簡潔 */ import scala.collection.JavaConverters._ val scalaT1 = T1.asScala.toList val scalaT2 = T2.asScala.toList /** * Inner Join要比較的是同一個key下,同一個時間視窗內的資料 */ if(scalaT1.nonEmpty && scalaT2.nonEmpty){ for(transaction <- scalaT1){ for(snapshot <- scalaT2){ out.collect(transaction.tx_code,transaction.tx_time, snapshot.md_time,transaction.tx_value,snapshot.md_value,"Inner Join Test") } } } } } class LeftJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] { override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = { /** * 將Java中的Iterable物件轉換為Scala的Iterable * scala的集合操作效率高,簡潔 */ import scala.collection.JavaConverters._ val scalaT1 = T1.asScala.toList val scalaT2 = T2.asScala.toList /** * Left Join要比較的是同一個key下,同一個時間視窗內的資料 */ if(scalaT1.nonEmpty && scalaT2.isEmpty){ for(transaction <- scalaT1){ out.collect(transaction.tx_code,transaction.tx_time, "",transaction.tx_value,0,"Left Join Test") } } } } class RightJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] { override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = { /** * 將Java中的Iterable物件轉換為Scala的Iterable * scala的集合操作效率高,簡潔 */ import scala.collection.JavaConverters._ val scalaT1 = T1.asScala.toList val scalaT2 = T2.asScala.toList /** * Right Join要比較的是同一個key下,同一個時間視窗內的資料 */ if(scalaT1.isEmpty && scalaT2.nonEmpty){ for(snapshot <- scalaT2){ out.collect(snapshot.md_code, "",snapshot.md_time,0,snapshot.md_value,"Right Join Test") } } } } } /** * 用於測試的資料 */ /** * Transaction: * 2016-07-28 13:00:01.820,000001,10.2 * 2016-07-28 13:00:01.260,000001,10.2 * 2016-07-28 13:00:02.980,000001,10.1 * 2016-07-28 13:00:03.120,000001,10.1 * 2016-07-28 13:00:04.330,000001,10.0 * 2016-07-28 13:00:05.570,000001,10.0 * 2016-07-28 13:00:05.990,000001,10.0 * 2016-07-28 13:00:14.000,000001,10.1 * 2016-07-28 13:00:20.000,000001,10.2 */ /** * Snapshot: * 2016-07-28 13:00:01.000,000001,10.2 * 2016-07-28 13:00:04.000,000001,10.1 * 2016-07-28 13:00:07.000,000001,10.0 * 2016-07-28 13:00:16.000,000001,10.1 */
測試
首先,開啟2個socket介面,分別使用9998和9999埠:
root@master:~# nc -lk 9998
r> oot@master:~# nc -lk 9999
其次,打包程式,釋出到叢集:
mvn clean package
之後,在socket中模擬輸入資料:
root@master:~# nc -lk 9998 2016-07-28 13:00:01.820,000001,10.2 2016-07-28 13:00:01.260,000001,10.2 2016-07-28 13:00:02.980,000001,10.1 2016-07-28 13:00:04.330,000001,10.0 2016-07-28 13:00:05.570,000001,10.0 2016-07-28 13:00:05.990,000001,10.0 2016-07-28 13:00:14.000,000001,10.1 2016-07-28 13:00:20.000,000001,10.2
root@master:~# nc -lk 9999 2016-07-28 13:00:01.000,000001,10.2 2016-07-28 13:00:04.000,000001,10.1 2016-07-28 13:00:07.000,000001,10.0 2016-07-28 13:00:16.000,000001,10.1
最後,看一下輸出:
(000001,2016-07-28 13:00:01.820,2016-07-28 13:00:01.000,10.2,10.2,Inner Join Test) (000001,2016-07-28 13:00:01.260,2016-07-28 13:00:01.000,10.2,10.2,Inner Join Test) (000001,2016-07-28 13:00:02.980,2016-07-28 13:00:01.000,10.1,10.2,Inner Join Test) (000001,2016-07-28 13:00:04.330,2016-07-28 13:00:04.000,10.0,10.1,Inner Join Test) (000001,2016-07-28 13:00:05.570,2016-07-28 13:00:04.000,10.0,10.1,Inner Join Test) (000001,2016-07-28 13:00:05.990,2016-07-28 13:00:04.000,10.0,10.1,Inner Join Test) (000001,2016-07-28 13:00:14.000,,10.1,0.0,Left Join Test) (000001,,2016-07-28 13:00:07.000,0.0,10.0,Right Join Test)
總結
在實際的工作中,coGroupStream的邏輯往往更復雜,例如需要引入state(key/value型別或者List型別),因此要繼承RichCoGroupFunction;而且需要考慮延遲的問題(即兩個流根據Event Time,接收的資料不同步),導致視窗內經常缺失資料等。這些問題需要更加複雜的管理。