Flink入門寶典(詳細截圖版)
本文基於java構建Flink1.9版本入門程式,需要Maven 3.0.4 和 Java 8 以上版本。需要安裝Netcat進行簡單除錯。
這裡簡述安裝過程,並使用IDEA進行開發一個簡單流處理程式,本地除錯或者提交到Flink上執行,Maven與JDK安裝這裡不做說明。
一、Flink簡介
Flink誕生於歐洲的一個大資料研究專案StratoSphere。該專案是柏林工業大學的一個研究性專案。早期,Flink是做Batch計算的,但是在2014年,StratoSphere裡面的核心成員孵化出Flink,同年將Flink捐贈Apache,並在後來成為Apache的頂級大資料專案,同時Flink計算的主流方向被定位為Streaming,即用流式計算來做所有大資料的計算,這就是Flink技術誕生的背景。
2015開始阿里開始介入flink 負責對資源排程和流式sql的優化,成立了阿里內部版本blink在最近更新的1.9版本中,blink開始合併入flink,
未來flink也將支援java,scala,python等更多語言,並在機器學習領域施展拳腳。
二、Flink開發環境搭建
首先要想執行Flink,我們需要下載並解壓Flink的二進位制包,下載地址如下:https://flink.apache.org/downloads.html
我們可以選擇Flink與Scala結合版本,這裡我們選擇最新的1.9版本Apache Flink 1.9.0 for Scala 2.12進行下載。
Flink在Windows和Linux下的安裝與部署可以檢視 Flink快速入門--安裝與示例執行,這裡演示windows版。
安裝成功後,啟動cmd命令列視窗,進入flink資料夾,執行bin目錄下的start-cluster.bat
$ cd flink
$ cd bin
$ start-cluster.bat
Starting a local cluster with one JobManager process and one TaskManager process.
You can terminate the processes via CTRL-C in the spawned shell windows.
Web interface by default on http://localhost:8081/.
顯示啟動成功後,我們在瀏覽器訪問 http://localhost:8081/可以看到flink的管理頁面。
三、Flink快速體驗
請保證安裝好了flink,還需要Maven 3.0.4 和 Java 8 以上版本。這裡簡述Maven構建過程。
其他詳細構建方法歡迎檢視:快速構建第一個Flink工程
1、搭建Maven工程
使用Flink Maven Archetype構建一個工程。
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.0
你可以編輯自己的artifactId groupId
目錄結構如下:
$ tree quickstart/
quickstart/
├── pom.xml
└── src
└── main
├── java
│ └── org
│ └── myorg
│ └── quickstart
│ ├── BatchJob.java
│ └── StreamingJob.java
└── resources
└── log4j.properties
在pom中核心依賴:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
2、編寫程式碼
StreamingJob
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class StreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStreaming = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStreaming.print();
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for(String word : sentence.split(" ")){
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
3、除錯程式
安裝netcat工具進行簡單除錯。
啟動netcat 輸入:
nc -l 9999
啟動程式
在netcat中輸入幾個單詞 逗號分隔
在程式一端檢視結果
4、程式提交到Flink
啟動flink
windows為 start-cluster.bat linux為start-cluster.sh
localhost:8081檢視管理頁面
通過maven對程式碼打包
將打好的包提交到flink上
檢視log
tail -f log/flink-***-jobmanager.out
在netcat中繼續輸入單詞,在Running Jobs中檢視作業狀態,在log中檢視輸出。
四、Flink 程式設計模型
Flink提供不同級別的抽象來開發流/批處理應用程式。
最低階抽象只提供有狀態流。
在實踐中,大多數應用程式不需要上述低階抽象,而是針對Core API程式設計, 如DataStream API(有界/無界流)和DataSet API(有界資料集)。
Table Api聲明瞭一個表,遵循關係模型。
最高階抽象是SQL。
我們這裡只用到了DataStream API。
Flink程式的基本構建塊是流和轉換。
一個程式的基本構成:
l 獲取execution environment
l 載入/建立原始資料
l 指定這些資料的轉化方法
l 指定計算結果的存放位置
l 觸發程式執行
五、DataStreaming API使用
1、獲取execution environment
StreamExecutionEnvironment是所有Flink程式的基礎,獲取方法有:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String ... jarFiles)
一般情況下使用getExecutionEnvironment。如果你在IDE或者常規java程式中執行可以通過createLocalEnvironment建立基於本地機器的StreamExecutionEnvironment。如果你已經建立jar程式希望通過invoke方式獲取裡面的getExecutionEnvironment方法可以使用createRemoteEnvironment方式。
2、載入/建立原始資料
StreamExecutionEnvironment提供的一些訪問資料來源的介面
(1)基於檔案的資料來源
readTextFile(path)
readFile(fileInputFormat, path)
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
(2)基於Socket的資料來源(本文使用的)
l socketTextStream
(3)基於Collection的資料來源
fromCollection(Collection)
fromCollection(Iterator, Class)
fromElements(T ...)
fromParallelCollection(SplittableIterator, Class)
generateSequence(from, to)
3、轉化方法
(1)Map方式:DataStream -> DataStream
功能:拿到一個element並輸出一個element,類似Hive中的UDF函式
舉例:
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
(2)FlatMap方式:DataStream -> DataStream
功能:拿到一個element,輸出多個值,類似Hive中的UDTF函式
舉例:
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
(3)Filter方式:DataStream -> DataStream
功能:針對每個element判斷函式是否返回true,最後只保留返回true的element
舉例:
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
(4)KeyBy方式:DataStream -> KeyedStream
功能:邏輯上將流分割成不相交的分割槽,每個分割槽都是相同key的元素
舉例:
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
(5)Reduce方式:KeyedStream -> DataStream
功能:在keyed data stream中進行輪訓reduce。
舉例:
keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
(6)Aggregations方式:KeyedStream -> DataStream
功能:在keyed data stream中進行聚合操作
舉例:
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
(7)Window方式:KeyedStream -> WindowedStream
功能:在KeyedStream中進行使用,根據某個特徵針對每個key用windows進行分組。
舉例:
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
(8)WindowAll方式:DataStream -> AllWindowedStream
功能:在DataStream中根據某個特徵進行分組。
舉例:
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
(9)Union方式:DataStream* -> DataStream
功能:合併多個數據流成一個新的資料流
舉例:
dataStream.union(otherStream1, otherStream2, ...);
(10)Split方式:DataStream -> SplitStream
功能:將流分割成多個流
舉例:
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});
(11)Select方式:SplitStream -> DataStream
功能:從split stream中選擇一個流
舉例:
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");
4、輸出資料
writeAsText()
writeAsCsv(...)
print() / printToErr()
writeUsingOutputFormat() / FileOutputFormat
writeToSocket
addSink
更多Flink相關原理:
穿梭時空的實時計算框架——Flink對時間的處理
大資料實時處理的王者-Flink
統一批處理流處理——Flink批流一體實現原理
Flink快速入門--安裝與示例執行
快速構建第一個Flink工程
更多實時計算,Flink,Kafka等相關技術博文,歡迎關注實時流式計算:
相關推薦
Flink入門寶典(詳細截圖版)
本文基於java構建Flink1.9版本入門程式,需要Maven 3.0.4 和 Java 8 以上版本。需要安裝Netcat進行簡單除錯。 這裡簡述安裝過程,並使用IDEA進行開發一個簡單流處理程式,本地除錯或者提交到Flink上執行,Maven與JDK安裝這裡不做說明。 一、Flink簡介 Fli
Kafka入門寶典(詳細截圖版)
1、瞭解 Apache Kafka 1.1、簡介 官網:http://kafka.apache.org/ Apache Kafka 是一個開源訊息系統,由Scala 寫成。是由Apache 軟體基金會開發的一個開源訊息系統專案。 Kafka 最初是由LinkedIn 開發,並於2011 年初開源。2
IT(計算機/軟件/互聯網)專業詞匯寶典(持續更新中)
hub point charger 中國 mar asi lose 社區 less 1.Stack Overflow:http://stackoverflow.com/ .一個著名的IT技術的問答站點。全然免費。程序猿必知。2.programmer:程序猿3.e
程式設計師面試寶典(第5版)
網站 更多書籍點選進入>> CiCi島 下載 電子版僅供預覽及學習交流使用,下載後請24小時內刪除,支援正版,喜歡的請購買正版書籍 電子書下載(皮皮雲盤-點選“普通下載”) 購買正版 封頁 編輯推薦 揭開知名IT企業面試、筆試
Java程式設計師面試寶典(第4版)
網站 更多書籍點選進入>> CiCi島 下載 電子版僅供預覽及學習交流使用,下載後請24小時內刪除,支援正版,喜歡的請購買正版書籍 電子書下載(皮皮雲盤-點選“普通下載”) 購買正版 封頁 編輯推薦 揭開知名IT企業面試、筆試
優動漫PAINT實用寶典(圖層篇)——柵格圖層
您在使用優動漫PAINT描畫影象時,是否都是從“新建圖層”開始的呢?瞭解圖層的型別及其不同圖層之間的差異,更易於使用優動漫PAINT作畫。優動漫PAINT入門寶典(圖層篇)將向各位小夥伴們展示不同圖層的特點,以及其使用方法。本次為大家介紹的是柵格圖層,下一篇將向大家介紹向量圖
opengl超級寶典(第五版)閱讀筆記 8 模型檢視投影矩陣
從這裡開始就進入了opengl最關鍵的部分了。 1.構造平移矩陣 m3dTranslationMatrix44(mTranslate, 0.0f, 0.0f, -2.5f); 第一個引數是4x4的矩陣,後面是位移向量 2.構造旋轉矩陣 m3dRotationMatrix44(m
opengl超級寶典(第五版)閱讀筆記 7 抗鋸齒
在圖形光柵化的時候,難免會出現很多鋸齒現象,如下圖所示: 可以看到很明顯的一段一段的鋸齒。 opengl中通過把該點畫素與周圍畫素相混合來優化鋸齒現象,方法如下: glBlendFunc(GL_SRC_ALPHA, GL_ONE_MINUS_SRC_ALPHA);//設定混合模式
opengl超級寶典(第五版)閱讀筆記 6 幾何圖形繪製細節
1.剔除背面 如果不剔除物體的背面的話,3D圖形的正面和背面會同時顯示,如下圖所示: 通過glEnable(GL_CULL_FACE);來剔除背面 2.深度測試 通過開啟深度測試,能保證在物體後面的圖形不會被渲染出來,通過glEnable(GL_DEPTH_TEST);開啟 3.三種填充模
opengl超級寶典(第五版)閱讀筆記 5 混合
這裡只提一種最基本的混合,其他的混合方式可以參考書中的表格 混合程式碼如下: glEnable(GL_BLEND); glBlendFunc(GL_SRC_ALPHA, GL_ONE_MINUS_SRC_ALPHA); shaderManager.UseStockShader(GL
opengl超級寶典(第五版)閱讀筆記 4 裁剪
通過glScissor(100, 100, 600, 400)函式可以設定裁剪區域,引數分別為左下角和右上角的座標 當然,別忘記要開啟裁剪測試glEnable(GL_SCISSOR_TEST); #include <GLTools.h> // OpenGL toolkit
opengl超級寶典(第五版)閱讀筆記 3 基本圖元的使用
基本圖元有 GL_POINTS GL_LINES GL_LINE_STRIP GL_LINE_LOOP GL_TRIANGLES GL_TRIANGLE_LOOP GL_TRIANGLE_FAN. 後面將一 一對其進行簡單的介紹 1.GL_POINTS I. 建立點集 GLfloat v
opengl超級寶典(第五版)閱讀筆記 1 基本程式框架
配置環境部分其實還是有點煩,網上資料有很多,耐心點問題也不大。 下面也算是opengl的hello world了,寫了比較詳細的註釋。 值得注意的是#pragma comment(lib,“gltools.lib”)這一行,書中是沒有的,可能因為環境配置方法的不同,我必須要手動連結一下glt
VS2012 中完整配置OpenGL超級寶典(第五版)編譯環境
在接觸OpenGL中,配置顯得相當麻煩,特別是在VS2012下配置時,存在許多問題,而網上的很多方法僅僅適用於VS2008,甚至僅適用於VC6.0,筆者經過自身的實踐,參考了許多網上的資料,總結了一下配置的方法,當然這僅僅是筆者的個人理解,筆者個人水平有限,因此未必是萬能
程式設計師面試寶典(第三版)——單鏈表的基本操作:建立,求長度,輸出,排序,插入,刪除,逆置
程式設計實現一個單鏈表的建立,求單鏈表的長度,列印輸出單鏈表,對單鏈表進行排序,插入元素,刪除元素,對單鏈表進行逆置。 我是借鑑參考資料,然後自己寫規範,對函式都進行了呼叫,每一次呼叫,都有輸出單鏈表。程式完整,已除錯執行。 源程式: #include<iostrea
《Java程式設計師面試寶典(第4版)》試讀感想
作為一名java程式設計師,已經有幾年經驗了,但是試讀章節的題目在看答案之前也自己做了一下,基本沒有做對一道題目,雖然有經驗,但是基礎的東西在平時工作用的少,或者一些實現方式或寫法根本沒有這樣寫過,所以這些題目答錯在所難免了。 面試寶典,顧名思義它的核心在於面試,往往
OpenGL超級寶典(第五版) 環境配置
特別提醒:有些在word中或者其他中的程式碼複製到vs中會報錯,原因是word中有些隱含的字元,複製到vs中就會報錯;重新輸一遍就可以解決問題,這裡只是提醒下! 可以參閱我前面轉載的一篇文章,進行比較然後來配置,本人蔘照這兩篇,成執行,算是學習opengl的開始吧;
零基礎新手的Python入門實戰寶典(二) —— 從哪裡開始?(搭建Python開發環境,Python + Pycharm)
如果你之前看過其他教程,但是發現雲裡霧裡複雜的讓你頭暈眼花的話,沒錯,看這裡,本系列Python教程專為啥都不會的新手使用者打造,放寬心,大膽看,我就是說說書,你就當聽聽故事,輕鬆愉快走進程式設計的大門,“程式設計”不再神祕也不再遙不可及。只要你會最基本的電腦操
零基礎新手的Python入門實戰寶典(一) —— Python都能幹些啥?(Python的用途)
如果你之前看過其他教程,但是發現雲裡霧裡複雜的讓你頭暈眼花的話,沒錯,看這裡,本系列Python教程專為啥都不會的新手使用者打造,放寬心,大膽看,我就是說說書,你就當聽聽故事,輕鬆愉快走進程式設計的大門,“程式設計”不再神祕也不再遙不可及。只要你會最基本的電腦操
Flink從入門到入土(詳細教程)
和其他所有的計算框架一樣,flink也有一些基礎的開發步驟以及基礎,核心的API,從開發步驟的角度來講,主要分為四大部分 1.Environment Flink Job在提交執行計算時,需要首先建立和Flink框架之間的聯絡,也就指的是當前的flink執行環境,只有獲取了環境資訊,才能將task排程到不同