flink實戰開發之Join和coGroup的區別和應用
簡介
Join和coGroup都是flinkSQL中用於連線多個流的運算元,但是有一定的區別,推薦能使用coGroup不要使用Join,因為coGroup更強大。下面讓我們簡單看一下兩個運算元的用法
Window Join |
在給定金鑰和公共視窗上連線兩個資料流。 dataStream.join(otherStream) .where(<key selector>).equalTo(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply { ... } |
Window CoGroup |
在給定金鑰和公共視窗上對兩個資料流進行Cogroup。 dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply {} |
CoGrouped原始碼
joinedStream原始碼
對比結論
結論一:.使用時,where裡面要傳入資料流T1要與資料流T2匹配的key,equalTo中傳入T2要和T1匹配的key。
結論二:join和coGroup的最大區別就是apply方法提供的引數型別不一樣,
join的apply
coGroup的apply引數
兩種運算元apply方法中的引數型別不一樣,join中提供的apply方法,引數是T1與T2泛型型別。而coGroup中提供的apply方法,引數是Iterator[T1]與Iterator[2]迭代器,基於這2種方式,如果我們的邏輯不僅僅是對一條record做處理,而是要與上一record或更復雜的判斷與比較,甚至是對結果排序,那麼join中的apply方法顯得比較困難,所以推薦使用coGroup
結論三:apply方法的好處
我們想要在Flink中實現實時的流計算,就可以通過joinedStream或coGroupedStream來實現。但是在join之後實施更復雜的運算,例如判斷、迭代等,僅僅通過SQL實現,恐怕會很麻煩,只能通過PL/SQL塊來實現,而Flink提供了apply方法,使用者可以自己編寫複雜的函式來實現。
案例 雙流jion
package flinkSQL
import java.text.SimpleDateFormat
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.scala._
/**
* Created by ${WangZhiHua} on 2018/11/13
*/
object JoinDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//獲取介面傳送的資料
val dataStream1 = env.readTextFile("C:/flink_data/scoket1.txt")
val dataStream2 = env.readTextFile("C:/flink_data/scoket2.txt")
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
//使用樣例類StockTransaction封裝獲取的資料
val dataStreamMap1 = dataStream1.map(f => {
val tokens1 = f.split(",")
StockTransaction(tokens1(0), tokens1(1), tokens1(2).toDouble)
})
.assignAscendingTimestamps(f => format.parse(f.tx_time).getTime)
//使用樣例類StockSnapshot封裝獲取的資料
val dataStreamMap2 = dataStream2.map(f => {
val tokens2 = f.split(",")
StockSnapshot(tokens2(0), tokens2(1), tokens2(2).toDouble)
})
.assignAscendingTimestamps(f => format.parse(f.md_time).getTime)
/**
* 進行雙流join
* 限定範圍是:3秒鐘的Event time時間視窗
*/
val joinStream = dataStreamMap1.coGroup(dataStreamMap2)
.where(_.tx_code)
.equalTo(_.md_code)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
val innerJoinStream = joinStream.apply(new InnerJoinFunction)
innerJoinStream.name("innerJoin").print()
print("===================== end =========================")
env.execute("join demo")
}
}
//定義樣例類封裝接收的資料
case class StockTransaction(tx_time:String, tx_code:String,tx_value:Double)
case class StockSnapshot(md_time:String, md_code:String,md_value:Double)
//定義一個內連線函式,繼承CoCroup
class InnerJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)]{
override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double,String)]): Unit = {
/**
* 將Java中的Iterable物件轉換為Scala的Iterable
* scala的集合操作效率高,簡潔
*/
import scala.collection.JavaConverters._
val scalaT1 = T1.asScala.toList
val scalaT2 = T2.asScala.toList
/**
* Inner Join要比較的是同一個key下,同一個時間視窗內的資料
*/
if(scalaT1.nonEmpty && scalaT2.nonEmpty){
for(transaction <- scalaT1){
for(snapshot <- scalaT2){
out.collect(transaction.tx_code,transaction.tx_time, snapshot.md_time,transaction.tx_value,snapshot.md_value,"Inner Join Test")
}
}
}
}
}
class LeftJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {
override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {
/**
* 將Java中的Iterable物件轉換為Scala的Iterable
* scala的集合操作效率高,簡潔
*/
import scala.collection.JavaConverters._
val scalaT1 = T1.asScala.toList
val scalaT2 = T2.asScala.toList
/**
* Left Join要比較的是同一個key下,同一個時間視窗內的資料
*/
if(scalaT1.nonEmpty && scalaT2.isEmpty){
for(transaction <- scalaT1){
out.collect(transaction.tx_code,transaction.tx_time, "",transaction.tx_value,0,"Left Join Test")
}
}
}
}
class RightJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {
override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {
/**
* 將Java中的Iterable物件轉換為Scala的Iterable
* scala的集合操作效率高,簡潔
*/
import scala.collection.JavaConverters._
val scalaT1 = T1.asScala.toList
val scalaT2 = T2.asScala.toList
/**
* Right Join要比較的是同一個key下,同一個時間視窗內的資料
*/
if(scalaT1.isEmpty && scalaT2.nonEmpty){
for(snapshot <- scalaT2){
out.collect(snapshot.md_code, "",snapshot.md_time,0,snapshot.md_value,"Right Join Test")
}
}
}
}
相關推薦
flink實戰開發之Join和coGroup的區別和應用
簡介 Join和coGroup都是flinkSQL中用於連線多個流的運算元,但是有一定的區別,推薦能使用coGroup不要使用Join,因為coGroup更強大。下面讓我們簡單看一下兩個運算元的用法 Window Join DataStream,DataSt
開發日常小結(38):MYSQL: left join / right join / join 的使用區別和聯絡
2018年10月19日 目錄 1、JOIN 的概念 有時為了得到完整的結果,我們需要從兩個或更多的表中獲取結果。我們就需要執行 join。 SQL join 用於根
Java之JSONObject存取值以及和HashMap區別, optString()和getString()區別和他的遍歷方式
結論: 1.JSONObject和HashMap用法上是一樣的,用put()方法存對於的Key-values鍵值對,取可用optString(key)和getString(key),get(key),存入的是什麼型別,取出來的時候就是什麼型別 2**.optString()在沒找到k
flink實戰開發----flinkSQL入門大全
flinkSQL概念介紹 Table API & SQL Apache Flink具有兩個關係API - 表API和SQL - 用於統一流和批處理。Table API是Scala和Java的語言整合查詢API,允許以非常直觀的方式組合來自關係運算符的查詢,Table API和SQ
django 開發之模型以及靜態問題和圖片的使用
使用Django的模型,基本步驟: 1.建立model 2.加入到admin.py中去 3.執行生成遷移:python manage.py makemigrations blog 4.執行遷移,生成表單 需要注意的是第二步,一定要加入APP相應的adimn.py 上去,再執行遷移
Flink與SparkStreaming之Counters& Accumulators累加器雙向應用案例實戰-Flink牛刀小試
版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何問題,可隨時聯絡。 1 累加器應用場
Android開發之獲取SIM卡資訊和手機號碼
獲取SIM卡資訊和手機號碼的工具類,記錄一下方便以後使用 import android.content.Context; import android.telephony.TelephonyManager; /** * Created by WangJinyong on 20
大資料開發之Hadoop篇----配置yarn和mapreduce
上一篇部落格中我們已經完成了hdfs的部署,現在我們開始部署yarn了。 我們先使用jps命令來檢視下現在與java相關的程序: 這裡NameNode以後簡稱為NN,DataNode簡稱為DN,而SecondaryNameNodel簡稱為SNN。 我們先切換到had
Python實戰開發之Pyramid Web框架在商城專案中的應用教程
第一講:Python基礎和入門介紹(Web開發基礎) 介紹Web開發技術和Python語言的基礎知識。通過本講的學習,學員可以瞭解Python Web開發技術的基礎和相關常識,為後續學習打下基礎。 第二講:Python語言基礎(運算子與表示式,控制流,函式,模組) 在第一講的基礎上繼續深入學習Pyth
R語言開發之平均值,中位數和眾數了解下
R中的統計分析通過使用許多內建函式來執行的,這些函式大部分是R基礎包的一部分,並且它們將R向量與引數一起作為輸入,並在執行計算後給出結果。 先來看如何求平均值。 平均值是通過取數值的總和併除以資料序列中的值的數量來計算,函式mean()用於在R中計算平均值,語法如下:
混合開發之webView載入html,android 和 html之間進行資料互動
現在混合開發比較普遍了,其實早就該學學了,只限於自己對html不是很熟,搭的介面太醜了,哈哈… 今天寫Demo的需求是這樣的 1、在一個介面裡,半面html,半面android原生控制元件。 2、點選html傳送html的資料給android ,andro
智慧對話機器人實戰開發(1)- 體系結構和分類
一、前言 人工智慧時代,以智慧對話機器人為最核心的技術應用方向之一,實現人機之間通過自然語言的溝通和交流,是智慧人機對話機器人的核心目標之一。尤其是Google Assistant 語音助手近期的卓越表現,如何開發智慧對話機器人系統,成為一個新的熱點。本文是系
IOS開發之Autolayout——“Content Compression Resistance”和“Content Hugging”
在使用storyboard進行UI佈局時,我們經常不經意間會注意到“Content Compression Resistance Priority”和“Content Hugging Priority”這兩個屬性。下面給大家簡單介紹下這兩個小傢伙:首先,我們得先來了解下另一個屬性intrinsic size(
OPC工作記錄整理——第四篇(OPC客戶端開發之OPC伺服器的列舉和連線)
OPC客戶端的開發相對來說,只要掌握了OPC基類的幾個介面,並知道它們是如何運作的,那麼開發起來還是相對容易的。好了,廢話不多說了,我們開始吧。 首先是對標頭檔案的引用: #include "stdafx.h" #include <afxcoll.h
iOS開發之限制只輸入數字和字母
首先,根據需求,定義一個巨集: #define NUM @"0123456789"//只輸入數字 #define ALPHA @"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"//只輸入字母 #def
安卓開發之使用雙程序守護和程序提權來實現服務程序保活
一、程序保活 在 如何讓我們的Android應用程序保活? 文章裡總結了一些程序保活方法,本文以雙程序守護和程序提權來保活我們的服務程序。 雙程序守護: 主要設計AB兩個不同服務程序,A程序的服務輪詢檢查B程序的服務是否存活,沒存活的話將其拉起,
安卓實戰開發之JNI從小白到偽老白深入瞭解JNI動態註冊native方法及JNI資料使用
前言 或許你知道了jni的簡單呼叫,其實不算什麼百度谷歌一大把,雖然這些jni絕大多數情況下都不會讓我們安卓工程師來弄,畢竟還是有點難,但是我們還是得打破砂鍋知道為什麼這樣幹吧,至少也讓我們知道呼叫流程和資料型別以及處理方法,或許你會有不一樣的發現。
Cocos2d—X遊戲開發之 CCLabelTTF 標籤詳解和對齊方式設定(分數顯示)(十六)
在Cocos2d—X遊戲開發中,CCLabelTTF 和 CCSprite 大概是使用最多的2個類了。標籤主要用於顯示靜態文字,可以設定字型的大小和位置等屬性。 現在,我們先來看下CCLabelTTF 的基本原始碼。 S1,從下面的程式碼可以看到 CCLabelTTF 繼
Lazarus實戰開發之串列埠通訊(WINCE/WIN32)
本文來自 http://blog.csdn.net/hellogv/ ,轉載必須註明出處!以下程式碼可到:http://download.csdn.net/source/611385 下載 Lazarus最吸引人的地方就是她的開發方式類似Delphi,支援超好用的RAD
Revit二次開發之WPF通過txt讀取和儲存TextBox的字串【附原始碼】
軟體版本:VS2015 Revit2018 功能:Revit中執行程式時,在Window中的TextBox中自動顯示指定txt檔案中的字串內容 缺點:會將txt檔案中的字串全部顯示 程式展示: 1.程式啟動 2.輸入“666”,點選Button1,自動關閉窗體 3