1. 程式人生 > >02.Flink的單機wordcount、叢集安裝

02.Flink的單機wordcount、叢集安裝

 一、單機安裝

1.準備安裝包

將原始碼編譯出的安裝包拷貝出來(編譯請參照上一篇01.Flink筆記-編譯、部署)或者在Flink官網下載bin包

2.配置

前置:jdk1.8+

修改配置檔案flink-conf.yaml

#Flink的預設WebUI埠號是8081,如果有衝突的服務,可更改
rest.port: 18081

 其餘項選擇預設即可

3.啟動

  • Linux:
./bin/start-cluster.sh

  • Win:
cd bin
start-cluster.bat

win本地啟動如下(圖片模糊可右擊在新標籤中開啟)

 

二、單機WordCount

1.java版本

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
 * @author :qinglanmei
 * @date :Created in 2019/4/10 11:10
 * @description:flink的java版本window示例
 */
public class WindowWordCount {
    public static void main(String[] args) throws Exception{
        final String hostname;
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            hostname = params.has("hostname") ? params.get("hostname") : "localhost";
            port = params.has("port") ? params.getInt("port"):9999;
   } catch (Exception e) { System.err.println("No port specified. Please run 'SocketWindowWordCount " + "--hostname <hostname> --port <port>', where hostname (localhost by default) " + "and port is the address of the text server"); System.err.println("To start a simple text server, run 'netcat -l <port>' and " + "type the input text into the command line"); return; } final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String,Integer>> dataStream = env .socketTextStream(hostname,port) .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(5),Time.seconds(5)) .sum(1); dataStream.print(); env.execute("WindowWordCount"); } public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for(String word : sentence.split(" ")){ out.collect(new Tuple2<String, Integer>(word, 1)); } } } }

2.scala版本

  • 本地安裝scala

  • idea配置scala外掛

  • scala的maven外掛

 1 <plugin>
 2                 <groupId>net.alchim31.maven</groupId>
 3                 <artifactId>scala-maven-plugin</artifactId>
 4                 <executions>
 5                     <execution>
 6                         <id>scala-compile-first</id>
 7                         <phase>process-resources</phase>
 8                         <goals>
 9                             <goal>add-source</goal>
10                             <goal>compile</goal>
11                         </goals>
12                     </execution>
13                     <execution>
14                         <id>scala-test-compile</id>
15                         <phase>process-test-resources</phase>
16                         <goals>
17                             <goal>testCompile</goal>
18                         </goals>
19                     </execution>
20                 </executions>
21             </plugin>
  • 程式碼詳細

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._

object SocketWindowCount {

  def main(args: Array[String]): Unit = {
    val port = try{
      ParameterTool.fromArgs(args).getInt("port")
    }catch {
      case e:Exception =>{
        System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
        return
      }
    }
    //1.獲取env
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //2.通過socket連接獲取輸入資料

    val text = env.socketTextStream("localhost",port,'\n')
    //3.一系列操作
    /**
      * timeWindow(size,slide)
      * size:視窗大小,slide:滑動間隔。分為以下三種情況:
      * 1.	視窗大小等於滑動間隔:這個就是滾動視窗,資料不會重疊,也不會丟失資料。
      * 2.	視窗大小大於滑動間隔:這個時候會有資料重疊,也即是資料會被重複處理。
      * 3.	視窗大小小於滑動間隔:必然是會導致資料丟失,不可取。
      */
    val windowCounts = text
      .flatMap { w => w.split("\\s") }
      .map {x:String => WordWithCount(x,1)}
      .keyBy(0)
.timeWindow(Time.seconds(5),Time.seconds(5)) .sum(1) //4.輸出結果,列印 windowCounts.print().setParallelism(1) //5.env執行 env.execute("Socket window WordCount") } case class WordWithCount(value: String, i: Long) }

3.win本地開發環境測試

安裝netcat

  • 下載:https://eternallybored.org/misc/netcat/
  • 配置環境變數

開啟cmd啟動nc埠號監聽(win下需加-p)

nc -l -p 9999

 idea中配置輸出引數

執行程式、在cmd命令視窗輸入單詞、按空格分隔、在idea本地即可輸出結果

4.Flink單機提交(Linux)

Linux安裝的是Flink1.8

  • 本地maven打包
mvn clean package
  • 上傳jar包到Linux
  • 啟動Flink節點(單機)
  • 另起一視窗,開啟nc -l 9999,輸入單詞,按空格分隔
  • 執行jar包
flink run -c com.qinglanmei.demo.flink.SocketWindowCount ./common-flink-core-1.0.jar --port 9999
  • 檢視輸出結果,Flink上執行的輸出在log/flink-root-taskexecutor-0-bigdata01.out
 tail -f /flink/flink-1.8.0/log/flink-root-taskexecutor-0-bigdata01.out  

 結果如下

觀察Flink的Web介面

可以發現

  • Total Jobs的Running=1
  • Available Task Slots=0,說明單機的flink任務槽已經被佔用了,因為每個槽執行一個並行管道

三、叢集安裝

1.standalone

環境配置:

  • jdk1.8+
  • ssh免密(具有相同的安裝目錄)

Flink設定

將原始碼編譯出的安裝包拷貝出來(編譯請參照上一篇01.Flink筆記-編譯、部署)或者在Flink官網下載bin包

節點分配:三個節點01-03分別是master、worker01.work02

flink-conf.yaml配置

# 主節點地址
jobmanager.rpc.address: bigdata01
# 任務槽數
taskmanager.numberOfTaskSlots: 2
# 埠號預設8081,因為與我的其他組建衝突,故改成18081
rest.port: 18081

可選配置:

  • 每個JobManager(jobmanager.heap.mb)的可用記憶體量,
  • 每個TaskManager(taskmanager.heap.mb)的可用記憶體量,
  • 每臺機器的可用CPU數量(taskmanager.numberOfTaskSlots),
  • 叢集中的CPU總數(parallelism.default)和
  • 臨時目錄(taskmanager.tmp.dirs

修改配置檔案masters、slaves

[root@bigdata01 conf]# vim masters 
bigdata01:18081
[root@bigdata01 conf]# vim slaves 
bigdata02
bigdata03

拷貝01的目錄到另外兩臺節點

scp -r flink-1.8.0/ bigdata02:/flink/
scp -r flink-1.8.0/ bigdata03:/flink/

配置環境變數(每個節點)

vim /etc/profile
#flink
export FLINK_HOME=/flink/flink-1.8.0
export PATH=$FLINK_HOME/bin:$PATH

使其生效source /etc/profile

啟動Flink叢集

start-cluster.sh 

檢視程序

檢視web

介面說明

Task Managers:等於worker數,即slaves檔案中配置的節點數

Task Slots:等於worker數*taskmanager.numberOfTaskSlots,taskmanager.numberOfTaskSlots即flink-conf.yaml中配置的引數

Available Task Slots:在沒有job情況下等於Task Slots

2.HA

修改配置檔案

修改flink-conf.yaml,高可用模式不需要指定jobmanager.rpc.address,在masters中新增jobmanager節點,由zookeeper做選舉

 

#jobmanager.rpc.address: bigdata01
high-availability: zookeeper #指定高可用模式(必須)
high-availability.zookeeper.quorum: bigdata01:2181,bigdata02:2181,bigdata03:2181 #ZooKeeper仲裁是ZooKeeper伺服器的複製組,它提供分散式協調服務(必須)
high-availability.storageDir:hdfs: ///flink/ha/ #JobManager元資料儲存在檔案系統storageDir中,只有指向此狀態的指標儲存在ZooKeeper中(必須)
high-availability.zookeeper.path.root: /flink #根ZooKeeper節點,在該節點下放置所有叢集節點(推薦)
#自定義叢集(推薦)
high-availability.cluster-id: /flinkCluster
state.backend: filesystem
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/checkpoints

 

修改masters

[root@bigdata01 conf]# vim masters 
bigdata01:18081
bigdata02:18081

修改slaves

 

[root@bigdata01 conf]# vim slaves 
bigdata02
bigdata03      

 

修改conf/zoo.cfg

 

# ZooKeeper quorum peers
server.1=bigdata01:2888:3888
server.2=bigdata02:2888:3888
server.3=bigdata03:2888:3888

 

 

其餘節點同步配置檔案

 

scp -r conf/ bigdata02:/flink/flink-1.8.0/
scp -r conf/ bigdata02:/flink/flink-1.8.0/

 

啟動

先啟動zookeeper叢集、再啟動hadoop、最後啟動flink

 

 

&n