02.Flink的單機wordcount、叢集安裝
一、單機安裝
1.準備安裝包
將原始碼編譯出的安裝包拷貝出來(編譯請參照上一篇01.Flink筆記-編譯、部署)或者在Flink官網下載bin包
2.配置
前置:jdk1.8+
修改配置檔案flink-conf.yaml
#Flink的預設WebUI埠號是8081,如果有衝突的服務,可更改
rest.port: 18081
其餘項選擇預設即可
3.啟動
- Linux:
./bin/start-cluster.sh
- Win:
cd bin start-cluster.bat
win本地啟動如下(圖片模糊可右擊在新標籤中開啟)
二、單機WordCount
1.java版本
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; /** * @author :qinglanmei * @date :Created in 2019/4/10 11:10 * @description:flink的java版本window示例 */ public class WindowWordCount { public static void main(String[] args) throws Exception{ final String hostname; final int port; try { final ParameterTool params = ParameterTool.fromArgs(args); hostname = params.has("hostname") ? params.get("hostname") : "localhost"; port = params.has("port") ? params.getInt("port"):9999;} catch (Exception e) { System.err.println("No port specified. Please run 'SocketWindowWordCount " + "--hostname <hostname> --port <port>', where hostname (localhost by default) " + "and port is the address of the text server"); System.err.println("To start a simple text server, run 'netcat -l <port>' and " + "type the input text into the command line"); return; } final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String,Integer>> dataStream = env .socketTextStream(hostname,port) .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(5),Time.seconds(5)) .sum(1); dataStream.print(); env.execute("WindowWordCount"); } public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for(String word : sentence.split(" ")){ out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
2.scala版本
-
本地安裝scala
-
idea配置scala外掛
-
scala的maven外掛
1 <plugin> 2 <groupId>net.alchim31.maven</groupId> 3 <artifactId>scala-maven-plugin</artifactId> 4 <executions> 5 <execution> 6 <id>scala-compile-first</id> 7 <phase>process-resources</phase> 8 <goals> 9 <goal>add-source</goal> 10 <goal>compile</goal> 11 </goals> 12 </execution> 13 <execution> 14 <id>scala-test-compile</id> 15 <phase>process-test-resources</phase> 16 <goals> 17 <goal>testCompile</goal> 18 </goals> 19 </execution> 20 </executions> 21 </plugin>
-
程式碼詳細
import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.scala._ object SocketWindowCount { def main(args: Array[String]): Unit = { val port = try{ ParameterTool.fromArgs(args).getInt("port") }catch { case e:Exception =>{ System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'") return } } //1.獲取env val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //2.通過socket連接獲取輸入資料 val text = env.socketTextStream("localhost",port,'\n') //3.一系列操作 /** * timeWindow(size,slide) * size:視窗大小,slide:滑動間隔。分為以下三種情況: * 1. 視窗大小等於滑動間隔:這個就是滾動視窗,資料不會重疊,也不會丟失資料。 * 2. 視窗大小大於滑動間隔:這個時候會有資料重疊,也即是資料會被重複處理。 * 3. 視窗大小小於滑動間隔:必然是會導致資料丟失,不可取。 */ val windowCounts = text .flatMap { w => w.split("\\s") } .map {x:String => WordWithCount(x,1)} .keyBy(0)
.timeWindow(Time.seconds(5),Time.seconds(5)) .sum(1) //4.輸出結果,列印 windowCounts.print().setParallelism(1) //5.env執行 env.execute("Socket window WordCount") } case class WordWithCount(value: String, i: Long) }
3.win本地開發環境測試
安裝netcat
- 下載:https://eternallybored.org/misc/netcat/
- 配置環境變數
開啟cmd啟動nc埠號監聽(win下需加-p)
nc -l -p 9999
idea中配置輸出引數
執行程式、在cmd命令視窗輸入單詞、按空格分隔、在idea本地即可輸出結果
4.Flink單機提交(Linux)
Linux安裝的是Flink1.8
- 本地maven打包
mvn clean package
- 上傳jar包到Linux
- 啟動Flink節點(單機)
- 另起一視窗,開啟nc -l 9999,輸入單詞,按空格分隔
- 執行jar包
flink run -c com.qinglanmei.demo.flink.SocketWindowCount ./common-flink-core-1.0.jar --port 9999
- 檢視輸出結果,Flink上執行的輸出在log/flink-root-taskexecutor-0-bigdata01.out
tail -f /flink/flink-1.8.0/log/flink-root-taskexecutor-0-bigdata01.out
結果如下
觀察Flink的Web介面
可以發現
- Total Jobs的Running=1
- Available Task Slots=0,說明單機的flink任務槽已經被佔用了,因為每個槽執行一個並行管道
三、叢集安裝
1.standalone
環境配置:
- jdk1.8+
- ssh免密(具有相同的安裝目錄)
Flink設定
將原始碼編譯出的安裝包拷貝出來(編譯請參照上一篇01.Flink筆記-編譯、部署)或者在Flink官網下載bin包
節點分配:三個節點01-03分別是master、worker01.work02
flink-conf.yaml配置
# 主節點地址
jobmanager.rpc.address: bigdata01
# 任務槽數
taskmanager.numberOfTaskSlots: 2
# 埠號預設8081,因為與我的其他組建衝突,故改成18081
rest.port: 18081
可選配置:
- 每個JobManager(
jobmanager.heap.mb
)的可用記憶體量, - 每個TaskManager(
taskmanager.heap.mb
)的可用記憶體量, - 每臺機器的可用CPU數量(
taskmanager.numberOfTaskSlots
), - 叢集中的CPU總數(
parallelism.default
)和 - 臨時目錄(
taskmanager.tmp.dirs
)
修改配置檔案masters、slaves
[root@bigdata01 conf]# vim masters bigdata01:18081 [root@bigdata01 conf]# vim slaves bigdata02 bigdata03
拷貝01的目錄到另外兩臺節點
scp -r flink-1.8.0/ bigdata02:/flink/ scp -r flink-1.8.0/ bigdata03:/flink/
配置環境變數(每個節點)
vim /etc/profile #flink export FLINK_HOME=/flink/flink-1.8.0 export PATH=$FLINK_HOME/bin:$PATH
使其生效source /etc/profile
啟動Flink叢集
start-cluster.sh
檢視程序
檢視web
介面說明
Task Managers:等於worker數,即slaves檔案中配置的節點數
Task Slots:等於worker數*taskmanager.numberOfTaskSlots,taskmanager.numberOfTaskSlots即flink-conf.yaml中配置的引數
Available Task Slots:在沒有job情況下等於Task Slots
2.HA
修改配置檔案
修改flink-conf.yaml,高可用模式不需要指定jobmanager.rpc.address,在masters中新增jobmanager節點,由zookeeper做選舉
#jobmanager.rpc.address: bigdata01
high-availability: zookeeper #指定高可用模式(必須)
high-availability.zookeeper.quorum: bigdata01:2181,bigdata02:2181,bigdata03:2181 #ZooKeeper仲裁是ZooKeeper伺服器的複製組,它提供分散式協調服務(必須)
high-availability.storageDir:hdfs: ///flink/ha/ #JobManager元資料儲存在檔案系統storageDir中,只有指向此狀態的指標儲存在ZooKeeper中(必須)
high-availability.zookeeper.path.root: /flink #根ZooKeeper節點,在該節點下放置所有叢集節點(推薦)
#自定義叢集(推薦)
high-availability.cluster-id: /flinkCluster
state.backend: filesystem
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/checkpoints
修改masters
[root@bigdata01 conf]# vim masters bigdata01:18081 bigdata02:18081
修改slaves
[root@bigdata01 conf]# vim slaves bigdata02 bigdata03
修改conf/zoo.cfg
# ZooKeeper quorum peers server.1=bigdata01:2888:3888 server.2=bigdata02:2888:3888 server.3=bigdata03:2888:3888
其餘節點同步配置檔案
scp -r conf/ bigdata02:/flink/flink-1.8.0/ scp -r conf/ bigdata02:/flink/flink-1.8.0/
啟動
先啟動zookeeper叢集、再啟動hadoop、最後啟動flink
&n