1. 程式人生 > >Flink入門寶典(詳細截圖版)

Flink入門寶典(詳細截圖版)


本文基於java構建Flink1.9版本入門程式,需要Maven 3.0.4 和 Java 8 以上版本。需要安裝Netcat進行簡單除錯。

這裡簡述安裝過程,並使用IDEA進行開發一個簡單流處理程式,本地除錯或者提交到Flink上執行,Maven與JDK安裝這裡不做說明。

一、Flink簡介

Flink誕生於歐洲的一個大資料研究專案StratoSphere。該專案是柏林工業大學的一個研究性專案。早期,Flink是做Batch計算的,但是在2014年,StratoSphere裡面的核心成員孵化出Flink,同年將Flink捐贈Apache,並在後來成為Apache的頂級大資料專案,同時Flink計算的主流方向被定位為Streaming,即用流式計算來做所有大資料的計算,這就是Flink技術誕生的背景。

2015開始阿里開始介入flink 負責對資源排程和流式sql的優化,成立了阿里內部版本blink在最近更新的1.9版本中,blink開始合併入flink,

未來flink也將支援java,scala,python等更多語言,並在機器學習領域施展拳腳。

二、Flink開發環境搭建

首先要想執行Flink,我們需要下載並解壓Flink的二進位制包,下載地址如下:https://flink.apache.org/downloads.html

我們可以選擇Flink與Scala結合版本,這裡我們選擇最新的1.9版本Apache Flink 1.9.0 for Scala 2.12進行下載。

Flink在Windows和Linux下的安裝與部署可以檢視 Flink快速入門--安裝與示例執行,這裡演示windows版。

安裝成功後,啟動cmd命令列視窗,進入flink資料夾,執行bin目錄下的start-cluster.bat

$ cd flink
$ cd bin
$ start-cluster.bat
Starting a local cluster with one JobManager process and one TaskManager process.
You can terminate the processes via CTRL-C in the spawned shell windows.
Web interface by default on http://localhost:8081/.

顯示啟動成功後,我們在瀏覽器訪問 http://localhost:8081/可以看到flink的管理頁面。

三、Flink快速體驗

請保證安裝好了flink,還需要Maven 3.0.4 和 Java 8 以上版本。這裡簡述Maven構建過程。

其他詳細構建方法歡迎檢視:快速構建第一個Flink工程

1、搭建Maven工程

使用Flink Maven Archetype構建一個工程。

 $ mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeVersion=1.9.0

你可以編輯自己的artifactId groupId

目錄結構如下:

$ tree quickstart/
quickstart/
├── pom.xml
└── src
    └── main
        ├── java
        │   └── org
        │       └── myorg
        │           └── quickstart
        │               ├── BatchJob.java
        │               └── StreamingJob.java
        └── resources
            └── log4j.properties

在pom中核心依賴:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

2、編寫程式碼

StreamingJob

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class StreamingJob {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String, Integer>> dataStreaming = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        dataStreaming.print();

        // execute program
        env.execute("Flink Streaming Java API Skeleton");
    }
    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for(String word : sentence.split(" ")){
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }

    }
}

3、除錯程式

安裝netcat工具進行簡單除錯。

啟動netcat 輸入:

nc -l 9999

啟動程式

在netcat中輸入幾個單詞 逗號分隔

在程式一端檢視結果

啟動flink

windows為 start-cluster.bat    linux為start-cluster.sh

localhost:8081檢視管理頁面

通過maven對程式碼打包

將打好的包提交到flink上

檢視log

tail -f log/flink-***-jobmanager.out

在netcat中繼續輸入單詞,在Running Jobs中檢視作業狀態,在log中檢視輸出。

Flink提供不同級別的抽象來開發流/批處理應用程式。

最低階抽象只提供有狀態流。

在實踐中,大多數應用程式不需要上述低階抽象,而是針對Core API程式設計, 如DataStream API(有界/無界流)和DataSet API(有界資料集)。

Table Api聲明瞭一個表,遵循關係模型。

最高階抽象是SQL。

我們這裡只用到了DataStream API。

Flink程式的基本構建塊是流和轉換。

一個程式的基本構成:

l 獲取execution environment

l 載入/建立原始資料

l 指定這些資料的轉化方法

l 指定計算結果的存放位置

l 觸發程式執行

五、DataStreaming API使用

1、獲取execution environment

StreamExecutionEnvironment是所有Flink程式的基礎,獲取方法有:

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(String host, int port, String ... jarFiles)

一般情況下使用getExecutionEnvironment。如果你在IDE或者常規java程式中執行可以通過createLocalEnvironment建立基於本地機器的StreamExecutionEnvironment。如果你已經建立jar程式希望通過invoke方式獲取裡面的getExecutionEnvironment方法可以使用createRemoteEnvironment方式。

2、載入/建立原始資料

StreamExecutionEnvironment提供的一些訪問資料來源的介面

(1)基於檔案的資料來源

readTextFile(path)
readFile(fileInputFormat, path)
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

(2)基於Socket的資料來源(本文使用的)

l socketTextStream

 

(3)基於Collection的資料來源

fromCollection(Collection)
fromCollection(Iterator, Class)
fromElements(T ...)
fromParallelCollection(SplittableIterator, Class)
generateSequence(from, to)

3、轉化方法

(1)Map方式:DataStream -> DataStream

功能:拿到一個element並輸出一個element,類似Hive中的UDF函式

舉例:

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});

(2)FlatMap方式:DataStream -> DataStream

功能:拿到一個element,輸出多個值,類似Hive中的UDTF函式

舉例:

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

(3)Filter方式:DataStream -> DataStream

功能:針對每個element判斷函式是否返回true,最後只保留返回true的element

舉例:

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});

(4)KeyBy方式:DataStream -> KeyedStream

功能:邏輯上將流分割成不相交的分割槽,每個分割槽都是相同key的元素

舉例:

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

(5)Reduce方式:KeyedStream -> DataStream

功能:在keyed data stream中進行輪訓reduce。

舉例:

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

(6)Aggregations方式:KeyedStream -> DataStream

功能:在keyed data stream中進行聚合操作

舉例:

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

(7)Window方式:KeyedStream -> WindowedStream

功能:在KeyedStream中進行使用,根據某個特徵針對每個key用windows進行分組。

舉例:

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

(8)WindowAll方式:DataStream -> AllWindowedStream

功能:在DataStream中根據某個特徵進行分組。

舉例:

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

(9)Union方式:DataStream* -> DataStream

功能:合併多個數據流成一個新的資料流

舉例:

dataStream.union(otherStream1, otherStream2, ...);

(10)Split方式:DataStream -> SplitStream

功能:將流分割成多個流

舉例:

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});

(11)Select方式:SplitStream -> DataStream

功能:從split stream中選擇一個流

舉例:

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");

4、輸出資料

writeAsText()
writeAsCsv(...)
print() / printToErr() 
writeUsingOutputFormat() / FileOutputFormat
writeToSocket
addSink

更多Flink相關原理:

穿梭時空的實時計算框架——Flink對時間的處理

大資料實時處理的王者-Flink

統一批處理流處理——Flink批流一體實現原理

Flink快速入門--安裝與示例執行

快速構建第一個Flink工程

更多實時計算,Flink,Kafka等相關技術博文,歡迎關注實時流式計算:

相關推薦

Flink入門詳細圖版

本文基於java構建Flink1.9版本入門程式,需要Maven 3.0.4 和 Java 8 以上版本。需要安裝Netcat進行簡單除錯。 這裡簡述安裝過程,並使用IDEA進行開發一個簡單流處理程式,本地除錯或者提交到Flink上執行,Maven與JDK安裝這裡不做說明。 一、Flink簡介 Fli

Kafka入門詳細圖版

1、瞭解 Apache Kafka 1.1、簡介 官網:http://kafka.apache.org/ Apache Kafka 是一個開源訊息系統,由Scala 寫成。是由Apache 軟體基金會開發的一個開源訊息系統專案。 Kafka 最初是由LinkedIn 開發,並於2011 年初開源。2

IT(計算機/軟件/互聯網)專業詞匯持續更新中

hub point charger 中國 mar asi lose 社區 less 1.Stack Overflow:http://stackoverflow.com/ .一個著名的IT技術的問答站點。全然免費。程序猿必知。2.programmer:程序猿3.e

程式設計師面試第5版

網站 更多書籍點選進入>> CiCi島 下載 電子版僅供預覽及學習交流使用,下載後請24小時內刪除,支援正版,喜歡的請購買正版書籍 電子書下載(皮皮雲盤-點選“普通下載”) 購買正版 封頁 編輯推薦 揭開知名IT企業面試、筆試

Java程式設計師面試第4版

網站 更多書籍點選進入>> CiCi島 下載 電子版僅供預覽及學習交流使用,下載後請24小時內刪除,支援正版,喜歡的請購買正版書籍 電子書下載(皮皮雲盤-點選“普通下載”) 購買正版 封頁 編輯推薦 揭開知名IT企業面試、筆試

優動漫PAINT實用圖層篇——柵格圖層

您在使用優動漫PAINT描畫影象時,是否都是從“新建圖層”開始的呢?瞭解圖層的型別及其不同圖層之間的差異,更易於使用優動漫PAINT作畫。優動漫PAINT入門寶典(圖層篇)將向各位小夥伴們展示不同圖層的特點,以及其使用方法。本次為大家介紹的是柵格圖層,下一篇將向大家介紹向量圖

opengl超級第五版閱讀筆記 8 模型檢視投影矩陣

從這裡開始就進入了opengl最關鍵的部分了。 1.構造平移矩陣 m3dTranslationMatrix44(mTranslate, 0.0f, 0.0f, -2.5f); 第一個引數是4x4的矩陣,後面是位移向量 2.構造旋轉矩陣 m3dRotationMatrix44(m

opengl超級第五版閱讀筆記 7 抗鋸齒

在圖形光柵化的時候,難免會出現很多鋸齒現象,如下圖所示: 可以看到很明顯的一段一段的鋸齒。 opengl中通過把該點畫素與周圍畫素相混合來優化鋸齒現象,方法如下: glBlendFunc(GL_SRC_ALPHA, GL_ONE_MINUS_SRC_ALPHA);//設定混合模式

opengl超級第五版閱讀筆記 6 幾何圖形繪製細節

1.剔除背面 如果不剔除物體的背面的話,3D圖形的正面和背面會同時顯示,如下圖所示: 通過glEnable(GL_CULL_FACE);來剔除背面 2.深度測試 通過開啟深度測試,能保證在物體後面的圖形不會被渲染出來,通過glEnable(GL_DEPTH_TEST);開啟 3.三種填充模

opengl超級第五版閱讀筆記 5 混合

這裡只提一種最基本的混合,其他的混合方式可以參考書中的表格 混合程式碼如下: glEnable(GL_BLEND); glBlendFunc(GL_SRC_ALPHA, GL_ONE_MINUS_SRC_ALPHA); shaderManager.UseStockShader(GL

opengl超級第五版閱讀筆記 4 裁剪

通過glScissor(100, 100, 600, 400)函式可以設定裁剪區域,引數分別為左下角和右上角的座標 當然,別忘記要開啟裁剪測試glEnable(GL_SCISSOR_TEST); #include <GLTools.h> // OpenGL toolkit

opengl超級第五版閱讀筆記 3 基本圖元的使用

基本圖元有 GL_POINTS GL_LINES GL_LINE_STRIP GL_LINE_LOOP GL_TRIANGLES GL_TRIANGLE_LOOP GL_TRIANGLE_FAN. 後面將一 一對其進行簡單的介紹 1.GL_POINTS I. 建立點集 GLfloat v

opengl超級第五版閱讀筆記 1 基本程式框架

配置環境部分其實還是有點煩,網上資料有很多,耐心點問題也不大。 下面也算是opengl的hello world了,寫了比較詳細的註釋。 值得注意的是#pragma comment(lib,“gltools.lib”)這一行,書中是沒有的,可能因為環境配置方法的不同,我必須要手動連結一下glt

VS2012 中完整配置OpenGL超級第五版編譯環境

在接觸OpenGL中,配置顯得相當麻煩,特別是在VS2012下配置時,存在許多問題,而網上的很多方法僅僅適用於VS2008,甚至僅適用於VC6.0,筆者經過自身的實踐,參考了許多網上的資料,總結了一下配置的方法,當然這僅僅是筆者的個人理解,筆者個人水平有限,因此未必是萬能

程式設計師面試第三版——單鏈表的基本操作:建立,求長度,輸出,排序,插入,刪除,逆置

程式設計實現一個單鏈表的建立,求單鏈表的長度,列印輸出單鏈表,對單鏈表進行排序,插入元素,刪除元素,對單鏈表進行逆置。 我是借鑑參考資料,然後自己寫規範,對函式都進行了呼叫,每一次呼叫,都有輸出單鏈表。程式完整,已除錯執行。 源程式: #include<iostrea

《Java程式設計師面試第4版》試讀感想

          作為一名java程式設計師,已經有幾年經驗了,但是試讀章節的題目在看答案之前也自己做了一下,基本沒有做對一道題目,雖然有經驗,但是基礎的東西在平時工作用的少,或者一些實現方式或寫法根本沒有這樣寫過,所以這些題目答錯在所難免了。           面試寶典,顧名思義它的核心在於面試,往往

OpenGL超級第五版 環境配置

特別提醒:有些在word中或者其他中的程式碼複製到vs中會報錯,原因是word中有些隱含的字元,複製到vs中就會報錯;重新輸一遍就可以解決問題,這裡只是提醒下!  可以參閱我前面轉載的一篇文章,進行比較然後來配置,本人蔘照這兩篇,成執行,算是學習opengl的開始吧;

零基礎新手的Python入門實戰 —— 從哪裡開始?搭建Python開發環境,Python + Pycharm

如果你之前看過其他教程,但是發現雲裡霧裡複雜的讓你頭暈眼花的話,沒錯,看這裡,本系列Python教程專為啥都不會的新手使用者打造,放寬心,大膽看,我就是說說書,你就當聽聽故事,輕鬆愉快走進程式設計的大門,“程式設計”不再神祕也不再遙不可及。只要你會最基本的電腦操

零基礎新手的Python入門實戰 —— Python都能幹些啥?Python的用途

如果你之前看過其他教程,但是發現雲裡霧裡複雜的讓你頭暈眼花的話,沒錯,看這裡,本系列Python教程專為啥都不會的新手使用者打造,放寬心,大膽看,我就是說說書,你就當聽聽故事,輕鬆愉快走進程式設計的大門,“程式設計”不再神祕也不再遙不可及。只要你會最基本的電腦操

Flink入門到入土詳細教程

和其他所有的計算框架一樣,flink也有一些基礎的開發步驟以及基礎,核心的API,從開發步驟的角度來講,主要分為四大部分 1.Environment Flink Job在提交執行計算時,需要首先建立和Flink框架之間的聯絡,也就指的是當前的flink執行環境,只有獲取了環境資訊,才能將task排程到不同