1. 程式人生 > >flink實戰開發之Join和coGroup的區別和應用

flink實戰開發之Join和coGroup的區別和應用

簡介

Join和coGroup都是flinkSQL中用於連線多個流的運算元,但是有一定的區別,推薦能使用coGroup不要使用Join,因為coGroup更強大。下面讓我們簡單看一下兩個運算元的用法

Window Join
DataStream,DataStream→DataStream

在給定金鑰和公共視窗上連線兩個資料流。

dataStream.join(otherStream)

    .where(<key selector>).equalTo(<key selector>)

    .window(TumblingEventTimeWindows.of(Time.seconds(3)))

    .apply { ... }

Window CoGroup
DataStream,DataStream→DataStream

在給定金鑰和公共視窗上對兩個資料流進行Cogroup。

dataStream.coGroup(otherStream)

    .where(0).equalTo(1)

    .window(TumblingEventTimeWindows.of(Time.seconds(3)))

    .apply {}

 CoGrouped原始碼

joinedStream原始碼

對比結論

結論一:.使用時,where裡面要傳入資料流T1要與資料流T2匹配的key,equalTo中傳入T2要和T1匹配的key。

結論二:join和coGroup的最大區別就是apply方法提供的引數型別不一樣,

join的apply

coGroup的apply引數

兩種運算元apply方法中的引數型別不一樣,join中提供的apply方法,引數是T1與T2泛型型別。而coGroup中提供的apply方法,引數是Iterator[T1]與Iterator[2]迭代器,基於這2種方式,如果我們的邏輯不僅僅是對一條record做處理,而是要與上一record或更復雜的判斷與比較,甚至是對結果排序,那麼join中的apply方法顯得比較困難,所以推薦使用coGroup

。 
結論三:apply方法的好處

我們想要在Flink中實現實時的流計算,就可以通過joinedStream或coGroupedStream來實現。但是在join之後實施更復雜的運算,例如判斷、迭代等,僅僅通過SQL實現,恐怕會很麻煩,只能通過PL/SQL塊來實現,而Flink提供了apply方法,使用者可以自己編寫複雜的函式來實現。

案例 雙流jion

package flinkSQL
import java.text.SimpleDateFormat
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.scala._

/**
  * Created by  ${WangZhiHua} on 2018/11/13
  */
object JoinDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //獲取介面傳送的資料
    val dataStream1 = env.readTextFile("C:/flink_data/scoket1.txt")
    val dataStream2 = env.readTextFile("C:/flink_data/scoket2.txt")
    val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
   //使用樣例類StockTransaction封裝獲取的資料
    val dataStreamMap1 = dataStream1.map(f => {
      val tokens1 = f.split(",")
      StockTransaction(tokens1(0), tokens1(1), tokens1(2).toDouble)
    })
      .assignAscendingTimestamps(f => format.parse(f.tx_time).getTime)
    //使用樣例類StockSnapshot封裝獲取的資料
    val dataStreamMap2 = dataStream2.map(f => {
      val tokens2 = f.split(",")
      StockSnapshot(tokens2(0), tokens2(1), tokens2(2).toDouble)
    })
      .assignAscendingTimestamps(f => format.parse(f.md_time).getTime)

    /**
      * 進行雙流join
      * 限定範圍是:3秒鐘的Event time時間視窗
      */

    val joinStream = dataStreamMap1.coGroup(dataStreamMap2)
      .where(_.tx_code)
      .equalTo(_.md_code)
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))

      val innerJoinStream = joinStream.apply(new InnerJoinFunction)
     innerJoinStream.name("innerJoin").print()
    print("===================== end =========================")
    env.execute("join demo")
  }
}
//定義樣例類封裝接收的資料
case class StockTransaction(tx_time:String, tx_code:String,tx_value:Double)
case class StockSnapshot(md_time:String, md_code:String,md_value:Double)
//定義一個內連線函式,繼承CoCroup
class InnerJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)]{
  override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double,String)]): Unit = {
    /**
      * 將Java中的Iterable物件轉換為Scala的Iterable
      * scala的集合操作效率高,簡潔
      */
    import scala.collection.JavaConverters._
    val scalaT1 = T1.asScala.toList
    val scalaT2 = T2.asScala.toList

    /**
      * Inner Join要比較的是同一個key下,同一個時間視窗內的資料
      */
    if(scalaT1.nonEmpty && scalaT2.nonEmpty){
      for(transaction <- scalaT1){
        for(snapshot <- scalaT2){
          out.collect(transaction.tx_code,transaction.tx_time, snapshot.md_time,transaction.tx_value,snapshot.md_value,"Inner Join Test")
        }
      }
    }
  }
}
class LeftJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {
  override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {
    /**
      * 將Java中的Iterable物件轉換為Scala的Iterable
      * scala的集合操作效率高,簡潔
      */
    import scala.collection.JavaConverters._
    val scalaT1 = T1.asScala.toList
    val scalaT2 = T2.asScala.toList

    /**
      * Left Join要比較的是同一個key下,同一個時間視窗內的資料
      */
    if(scalaT1.nonEmpty && scalaT2.isEmpty){
      for(transaction <- scalaT1){
        out.collect(transaction.tx_code,transaction.tx_time, "",transaction.tx_value,0,"Left Join Test")
      }
    }
  }
}
class RightJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {
  override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {
    /**
      * 將Java中的Iterable物件轉換為Scala的Iterable
      * scala的集合操作效率高,簡潔
      */
    import scala.collection.JavaConverters._
    val scalaT1 = T1.asScala.toList
    val scalaT2 = T2.asScala.toList

    /**
      * Right Join要比較的是同一個key下,同一個時間視窗內的資料
      */
    if(scalaT1.isEmpty && scalaT2.nonEmpty){
      for(snapshot <- scalaT2){
        out.collect(snapshot.md_code, "",snapshot.md_time,0,snapshot.md_value,"Right Join Test")
      }
    }
  }
}

相關推薦

flink實戰開發JoincoGroup區別應用

簡介 Join和coGroup都是flinkSQL中用於連線多個流的運算元,但是有一定的區別,推薦能使用coGroup不要使用Join,因為coGroup更強大。下面讓我們簡單看一下兩個運算元的用法 Window Join DataStream,DataSt

開發日常小結(38):MYSQL: left join / right join / join 的使用區別聯絡

2018年10月19日 目錄 1、JOIN 的概念 有時為了得到完整的結果,我們需要從兩個或更多的表中獲取結果。我們就需要執行 join。 SQL join 用於根

JavaJSONObject存取值以及HashMap區別, optString()getString()區別他的遍歷方式

結論: 1.JSONObject和HashMap用法上是一樣的,用put()方法存對於的Key-values鍵值對,取可用optString(key)和getString(key),get(key),存入的是什麼型別,取出來的時候就是什麼型別 2**.optString()在沒找到k

flink實戰開發----flinkSQL入門大全

flinkSQL概念介紹 Table API & SQL Apache Flink具有兩個關係API - 表API和SQL - 用於統一流和批處理。Table API是Scala和Java的語言整合查詢API,允許以非常直觀的方式組合來自關係運算符的查詢,Table API和SQ

django 開發模型以及靜態問題圖片的使用

使用Django的模型,基本步驟: 1.建立model 2.加入到admin.py中去 3.執行生成遷移:python manage.py makemigrations blog 4.執行遷移,生成表單   需要注意的是第二步,一定要加入APP相應的adimn.py 上去,再執行遷移

Flink與SparkStreamingCounters& Accumulators累加器雙向應用案例實戰-Flink牛刀小試

版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何問題,可隨時聯絡。 1 累加器應用場

Android開發獲取SIM卡資訊手機號碼

獲取SIM卡資訊和手機號碼的工具類,記錄一下方便以後使用 import android.content.Context; import android.telephony.TelephonyManager; /** * Created by WangJinyong on 20

大資料開發Hadoop篇----配置yarnmapreduce

上一篇部落格中我們已經完成了hdfs的部署,現在我們開始部署yarn了。 我們先使用jps命令來檢視下現在與java相關的程序: 這裡NameNode以後簡稱為NN,DataNode簡稱為DN,而SecondaryNameNodel簡稱為SNN。 我們先切換到had

Python實戰開發Pyramid Web框架在商城專案中的應用教程

第一講:Python基礎和入門介紹(Web開發基礎) 介紹Web開發技術和Python語言的基礎知識。通過本講的學習,學員可以瞭解Python Web開發技術的基礎和相關常識,為後續學習打下基礎。 第二講:Python語言基礎(運算子與表示式,控制流,函式,模組) 在第一講的基礎上繼續深入學習Pyth

R語言開發平均值,中位數眾數了解下

R中的統計分析通過使用許多內建函式來執行的,這些函式大部分是R基礎包的一部分,並且它們將R向量與引數一起作為輸入,並在執行計算後給出結果。 先來看如何求平均值。 平均值是通過取數值的總和併除以資料序列中的值的數量來計算,函式mean()用於在R中計算平均值,語法如下:

混合開發webView載入html,android html之間進行資料互動

現在混合開發比較普遍了,其實早就該學學了,只限於自己對html不是很熟,搭的介面太醜了,哈哈… 今天寫Demo的需求是這樣的 1、在一個介面裡,半面html,半面android原生控制元件。 2、點選html傳送html的資料給android ,andro

智慧對話機器人實戰開發(1)- 體系結構分類

一、前言 人工智慧時代,以智慧對話機器人為最核心的技術應用方向之一,實現人機之間通過自然語言的溝通和交流,是智慧人機對話機器人的核心目標之一。尤其是Google Assistant 語音助手近期的卓越表現,如何開發智慧對話機器人系統,成為一個新的熱點。本文是系

IOS開發Autolayout——“Content Compression Resistance”“Content Hugging”

在使用storyboard進行UI佈局時,我們經常不經意間會注意到“Content Compression Resistance Priority”和“Content Hugging Priority”這兩個屬性。下面給大家簡單介紹下這兩個小傢伙:首先,我們得先來了解下另一個屬性intrinsic size(

OPC工作記錄整理——第四篇(OPC客戶端開發OPC伺服器的列舉連線)

    OPC客戶端的開發相對來說,只要掌握了OPC基類的幾個介面,並知道它們是如何運作的,那麼開發起來還是相對容易的。好了,廢話不多說了,我們開始吧。     首先是對標頭檔案的引用: #include "stdafx.h" #include <afxcoll.h

iOS開發限制只輸入數字字母

首先,根據需求,定義一個巨集: #define NUM @"0123456789"//只輸入數字 #define ALPHA @"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"//只輸入字母 #def

安卓開發使用雙程序守護程序提權來實現服務程序保活

一、程序保活 在 如何讓我們的Android應用程序保活? 文章裡總結了一些程序保活方法,本文以雙程序守護和程序提權來保活我們的服務程序。 雙程序守護: 主要設計AB兩個不同服務程序,A程序的服務輪詢檢查B程序的服務是否存活,沒存活的話將其拉起,

安卓實戰開發JNI從小白到偽老白深入瞭解JNI動態註冊native方法及JNI資料使用

前言 或許你知道了jni的簡單呼叫,其實不算什麼百度谷歌一大把,雖然這些jni絕大多數情況下都不會讓我們安卓工程師來弄,畢竟還是有點難,但是我們還是得打破砂鍋知道為什麼這樣幹吧,至少也讓我們知道呼叫流程和資料型別以及處理方法,或許你會有不一樣的發現。

Cocos2d—X遊戲開發 CCLabelTTF 標籤詳解對齊方式設定(分數顯示)(十六)

在Cocos2d—X遊戲開發中,CCLabelTTF 和 CCSprite 大概是使用最多的2個類了。標籤主要用於顯示靜態文字,可以設定字型的大小和位置等屬性。 現在,我們先來看下CCLabelTTF 的基本原始碼。 S1,從下面的程式碼可以看到 CCLabelTTF 繼

Lazarus實戰開發串列埠通訊(WINCE/WIN32)

本文來自 http://blog.csdn.net/hellogv/ ,轉載必須註明出處!以下程式碼可到:http://download.csdn.net/source/611385 下載    Lazarus最吸引人的地方就是她的開發方式類似Delphi,支援超好用的RAD

Revit二次開發WPF通過txt讀取儲存TextBox的字串【附原始碼】

軟體版本:VS2015 Revit2018 功能:Revit中執行程式時,在Window中的TextBox中自動顯示指定txt檔案中的字串內容 缺點:會將txt檔案中的字串全部顯示 程式展示: 1.程式啟動 2.輸入“666”,點選Button1,自動關閉窗體 3