1. 程式人生 > >Flink入門(三)——環境與部署

Flink入門(三)——環境與部署

flink是一款開源的大資料流式處理框架,他可以同時批處理和流處理,具有容錯性、高吞吐、低延遲等優勢,本文簡述flink在windows和linux中安裝步驟,和示例程式的執行,包括本地除錯環境,叢集環境。另外介紹Flink的開發工程的構建。

首先要想執行Flink,我們需要下載並解壓Flink的二進位制包,下載地址如下:https://flink.apache.org/downloads.html

我們可以選擇Flink與Scala結合版本,這裡我們選擇最新的1.9版本Apache Flink 1.9.0 for Scala 2.12進行下載。

下載成功後,在windows系統中可以通過Windows的bat檔案或者Cygwin來執行Flink。

在linux系統中分為單機,叢集和Hadoop等多種情況。

通過Windows的bat檔案執行

首先啟動cmd命令列視窗,進入flink資料夾,執行bin目錄下的start-cluster.bat

注意:執行flink需要java環境,請確保系統已經配置java環境變數。

$ cd flink
$ cd bin
$ start-cluster.bat
Starting a local cluster with one JobManager process and one TaskManager process.
You can terminate the processes via CTRL-C in the spawned shell windows.
Web interface by default on http://localhost:8081/.

顯示啟動成功後,我們在瀏覽器訪問 http://localhost:8081/可以看到flink的管理頁面。

通過Cygwin執行

Cygwin是一個在windows平臺上執行的類UNIX模擬環境,官網下載:http://cygwin.com/install.html

安裝成功後,啟動Cygwin終端,執行start-cluster.sh指令碼。

$ cd flink
$ bin/start-cluster.sh
Starting cluster.

顯示啟動成功後,我們在瀏覽器訪問 http://localhost:8081/可以看到flink的管理頁面。

單節點安裝

在Linux上單節點安裝方式與cygwin一樣,下載Apache Flink 1.9.0 for Scala 2.12,然後解壓後只需要啟動start-cluster.sh。

叢集安裝

叢集安裝分為以下幾步:

1、在每臺機器上覆制解壓出來的flink目錄。

2、選擇一個作為master節點,然後修改所有機器conf/flink-conf.yaml

jobmanager.rpc.address = master主機名

3、修改conf/slaves,將所有work節點寫入

work01
work02

4、在master上啟動叢集

bin/start-cluster.sh

安裝在Hadoop

我們可以選擇讓Flink執行在Yarn叢集上。

下載Flink for Hadoop的包

 保證 HADOOP_HOME已經正確設定即可

啟動 bin/yarn-session.sh

執行flink示例程式

批處理示例:

提交flink的批處理examples程式:

bin/flink run examples/batch/WordCount.jar

這是flink提供的examples下的批處理例子程式,統計單詞個數。

$ bin/flink run examples/batch/WordCount.jar
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)
(awry,1)
(ay,1)

得到結果,這裡統計的是預設的資料集,可以通過--input --output指定輸入輸出。

我們可以在頁面中檢視執行的情況:

流處理示例:

啟動nc伺服器:

nc -l 9000

提交flink的批處理examples程式:

bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

這是flink提供的examples下的流處理例子程式,接收socket資料傳入,統計單詞個數。

在nc端寫入單詞

$ nc -l 9000
lorem ipsum
ipsum ipsum ipsum
bye

輸出在日誌中

$ tail -f log/flink-*-taskexecutor-*.out
lorem : 1
bye : 1
ipsum : 4

停止flink

$ ./bin/stop-cluster.sh

在安裝好Flink以後,只要快速構建Flink工程,並完成相關程式碼開發,就可以輕鬆入手Flink。

構建工具

Flink專案可以使用不同的構建工具進行構建。為了能夠快速入門,Flink 為以下構建工具提供了專案模版:

  • Maven
  • Gradle

這些模版可以幫助你搭建專案結構並建立初始構建檔案。

Maven

環境要求

唯一的要求是使用 Maven 3.0.4 (或更高版本)和安裝 Java 8.x。

建立專案

使用以下命令之一來 建立專案:

使用Maven archetypes

 $ mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeVersion=1.9.0

執行quickstart指令碼

 curl https://flink.apache.org/q/quickstart.sh | bash -s 1.9.0

下載完成後,檢視專案目錄結構:

tree quickstart/
quickstart/
├── pom.xml
└── src
    └── main
        ├── java
        │   └── org
        │       └── myorg
        │           └── quickstart
        │               ├── BatchJob.java
        │               └── StreamingJob.java
        └── resources
            └── log4j.properties

示例專案是一個 Maven project,它包含了兩個類:StreamingJobBatchJob 分別是 DataStream and DataSet 程式的基礎骨架程式。
main 方法是程式的入口,既可用於IDE測試/執行,也可用於部署。

我們建議你將 此專案匯入IDE 來開發和測試它。
IntelliJ IDEA 支援 Maven 專案開箱即用。如果你使用的是 Eclipse,使用m2e 外掛 可以
匯入 Maven 專案。
一些 Eclipse 捆綁包預設包含該外掛,其他情況需要你手動安裝。

請注意:對 Flink 來說,預設的 JVM 堆記憶體可能太小,你應當手動增加堆記憶體。
在 Eclipse 中,選擇 Run Configurations -> Arguments 並在 VM Arguments 對應的輸入框中寫入:-Xmx800m
在 IntelliJ IDEA 中,推薦從選單 Help | Edit Custom VM Options 來修改 JVM 選項。

構建專案

如果你想要 構建/打包你的專案,請在專案目錄下執行 ‘mvn clean package’ 命令。命令執行後,你將 找到一個JAR檔案,裡面包含了你的應用程式,以及已作為依賴項新增到應用程式的聯結器和庫:target/-.jar

注意: 如果你使用其他類而不是 StreamingJob 作為應用程式的主類/入口,我們建議你相應地修改 pom.xml 檔案中的 mainClass 配置。這樣,Flink 可以從 JAR 檔案執行應用程式,而無需另外指定主類。

Gradle

環境要求

唯一的要求是使用 Gradle 3.x (或更高版本) 和安裝 Java 8.x 。

建立專案

使用以下命令之一來 建立專案:

Gradle示例:

build.gradle

buildscript {
    repositories {
        jcenter() // this applies only to the Gradle 'Shadow' plugin
    }
    dependencies {
        classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
    }
}

plugins {
    id 'java'
    id 'application'
    // shadow plugin to produce fat JARs
    id 'com.github.johnrengelman.shadow' version '2.0.4'
}


// artifact properties
group = 'org.myorg.quickstart'
version = '0.1-SNAPSHOT'
mainClassName = 'org.myorg.quickstart.StreamingJob'
description = """Flink Quickstart Job"""

ext {
    javaVersion = '1.8'
    flinkVersion = '1.9.0'
    scalaBinaryVersion = '2.11'
    slf4jVersion = '1.7.7'
    log4jVersion = '1.2.17'
}


sourceCompatibility = javaVersion
targetCompatibility = javaVersion
tasks.withType(JavaCompile) {
    options.encoding = 'UTF-8'
}

applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]

task wrapper(type: Wrapper) {
    gradleVersion = '3.1'
}

// declare where to find the dependencies of your project
repositories {
    mavenCentral()
    maven { url "https://repository.apache.org/content/repositories/snapshots/" }
}

// 注意:我們不能使用 "compileOnly" 或者 "shadow" 配置,這會使我們無法在 IDE 中或通過使用 "gradle run" 命令執行程式碼。
// 我們也不能從 shadowJar 中排除傳遞依賴(請檢視 https://github.com/johnrengelman/shadow/issues/159)。
// -> 顯式定義我們想要包含在 "flinkShadowJar" 配置中的類庫!
configurations {
    flinkShadowJar // dependencies which go into the shadowJar

    // 總是排除這些依賴(也來自傳遞依賴),因為 Flink 會提供這些依賴。
    flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
    flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'
    flinkShadowJar.exclude group: 'org.slf4j'
    flinkShadowJar.exclude group: 'log4j'
}

// declare the dependencies for your production and test code
dependencies {
    // --------------------------------------------------------------
    // 編譯時依賴不應該包含在 shadow jar 中,
    // 這些依賴會在 Flink 的 lib 目錄中提供。
    // --------------------------------------------------------------
    compile "org.apache.flink:flink-java:${flinkVersion}"
    compile "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"

    // --------------------------------------------------------------
    // 應該包含在 shadow jar 中的依賴,例如:聯結器。
    // 它們必須在 flinkShadowJar 的配置中!
    // --------------------------------------------------------------
    //flinkShadowJar "org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"

    compile "log4j:log4j:${log4jVersion}"
    compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"

    // Add test dependencies here.
    // testCompile "junit:junit:4.12"
}

// make compileOnly dependencies available for tests:
sourceSets {
    main.compileClasspath += configurations.flinkShadowJar
    main.runtimeClasspath += configurations.flinkShadowJar

    test.compileClasspath += configurations.flinkShadowJar
    test.runtimeClasspath += configurations.flinkShadowJar

    javadoc.classpath += configurations.flinkShadowJar
}

run.classpath = sourceSets.main.runtimeClasspath

jar {
    manifest {
        attributes 'Built-By': System.getProperty('user.name'),
                'Build-Jdk': System.getProperty('java.version')
    }
}

shadowJar {
    configurations = [project.configurations.flinkShadowJar]
}

setting.gradle

rootProject.name = 'quickstart'

或者執行quickstart指令碼

    bash -c "$(curl https://flink.apache.org/q/gradle-quickstart.sh)" -- 1.9.0 2.11

檢視目錄結構:

tree quickstart/
quickstart/
├── README
├── build.gradle
├── settings.gradle
└── src
    └── main
        ├── java
        │   └── org
        │       └── myorg
        │           └── quickstart
        │               ├── BatchJob.java
        │               └── StreamingJob.java
        └── resources
            └── log4j.properties

示例專案是一個 Gradle 專案,它包含了兩個類:StreamingJobBatchJobDataStreamDataSet 程式的基礎骨架程式。main 方法是程式的入口,即可用於IDE測試/執行,也可用於部署。

我們建議你將 此專案匯入你的 IDE 來開發和測試它。IntelliJ IDEA 在安裝 Gradle 外掛後支援 Gradle 專案。Eclipse 則通過 Eclipse Buildship 外掛支援 Gradle 專案(鑑於 shadow 外掛對 Gradle 版本有要求,請確保在匯入嚮導的最後一步指定 Gradle 版本 >= 3.0)。你也可以使用 Gradle’s IDE integration 從 Gradle 建立專案檔案。

構建專案

如果你想要 構建/打包專案,請在專案目錄下執行 ‘gradle clean shadowJar’ 命令。命令執行後,你將 找到一個 JAR 檔案,裡面包含了你的應用程式,以及已作為依賴項新增到應用程式的聯結器和庫:build/libs/--all.jar

注意: 如果你使用其他類而不是 StreamingJob 作為應用程式的主類/入口,我們建議你相應地修改 build.gradle 檔案中的 mainClassName 配置。這樣,Flink 可以從 JAR 檔案執行應用程式,而無需另外指定主類。

Flink系列文章:

Flink入門(一)——Apache Flink介紹
Flink入門(二)——Flink架構介紹

更多實時計算,Flink,Kafka等相關技術博文,歡迎關注實時流式計算

相關推薦

Flink入門——環境部署

flink是一款開源的大資料流式處理框架,他可以同時批處理和流處理,具有容錯性、高吞吐、低延遲等優勢,本文簡述flink在windows和linux中安裝步驟,和示例程式的執行,包括本地除錯環境,叢集環境。另外介紹Flink的開發工程的構建。 首先要想執行Flink,我們需要下載並解壓Flink的二進位制

ES6快速入門模組

類與模組 一、類 一)類的宣告 class Person { constructor(name) { this.name = name; } sayName() { console.log(this.name); } } let t

PyQt5入門——訊號

此總結主要參考下面這篇文章:PyQt5事件和訊號 例子1、2的self繼承自QWidget, 1繼承自QWidget是因為vbox是QWidget類獨有的 1. 訊號槽 Signals & slots from PyQt5.QtCore import Qt

Python3入門——概述環境安裝

知乎 沒有 color https oob 環境 bsp 風格 初級 一、概述   1.python是什麽     Python 是一個高層次的結合了解釋性、編譯性、互動性和面向對象的腳本語言。 Python 是一種解釋型語言: 這意味著開發過程中沒有了編

Java的陣列定義方法過載——有C++基礎的Java入門

目錄 一、 陣列的定義 1、定義 2、 原理 3、 賦值方式 二、Java的方法過載 1、 概念 (1) 過載 (2) 目的 2、例子 (1) 比較兩個數值的大小 (2) 執行方式及結果 一、 陣列的定義 1、定義 資料型別 [ ]&n

idea部署Maven入門——環境變數的配置和下載

    介紹:       1  Maven是用來管理jar包的一種工具,       2  Maven主要是構建java專案和java web專案       &

Windows10下的docker安裝入門 建立自己的docker映象並且在容器中執行它

Docker 是一個開源的應用容器引擎,讓開發者可以打包他們的應用以及依賴包到一個可移植的容器中,然後釋出到任何流行的 Linux 機器上,也可以實現虛擬化。容器是完全使用沙箱機制,相互之間不會有任何介面。 本教程主要分以下幾點內容:  ------------

Flask入門~補充及虛擬環境

上篇文章中有幾個點不全面,在這裡補充幾點以及入門的幾個小方法: 上篇文章中使用jsonify模組讓網頁直接顯示json資料,返回的是二進位制碼, 如何解碼呢?以及開啟debug的幾個小方法: 程式碼如下: 方法一: 在py檔案中配置 # #解決中文亂碼問題,將json

Apache Flink 零基礎入門編寫最簡單的helloWorld

實驗環境 JDK 1.8 IDE Intellij idea Flink 1.8.1 實驗內容 建立一個Flink簡

分布式系統的那些事兒 - 系統系統之間的調用

數據格式 轉換 處理 分布 互調 圖片處理 動作 人性 並且 系統與系統之間的調用通俗來講,分為本地同一臺服務器上的服務相互調用與遠程服務調用,這個都可以稱之為RPC通信。淺白點講,客戶訪問服務器A,此時服務器要完成某個動作必須訪問服務器B,服務器A與B互相通信,相互調用,

Storm入門HelloWorld示例

right 出現 9.png context color tro order tput 執行 一、關聯代碼 使用maven,代碼如下。 pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:x

Android BLE終端通信——client服務端通信過程以及實現數據通信

.sh 沒有 indexof 實例 解析 rip listview filter @override Android BLE與終端通信(三)——client與服務端通信過程以及實現數據通信 前面的終究僅僅是小知識點。上不了臺面,也僅僅能算是起

java加密算法入門-非對稱加密詳解

共享數據 net clas 實例 查看 安全性 自己的 generator mir 1、簡單介紹 這幾天一直在看非對稱的加密,相比之前的兩篇內容,這次看了兩倍多的時間還雲裏霧裏的,所以這篇文章相對之前的兩篇,概念性的東西多了些,另外是代碼的每一步我都做了介紹,方便自己以後

Linux入門

ls cat hwclock cd date echo Linux常用命令: Linux文件系統: 1.文件名名稱嚴格區分大小寫 2.文件可以使用除/意外的任意字符;不建議使用特殊字符 3.文件名長度不能超過255個字符 4. 以. 開頭的文件為隱藏文件 工作目錄:workin

spring boot : 熱部署

pom.xml文件 添加 gin 字節 loader 信息 dev spring tool 介紹了Spring boot實現熱部署的兩種方式,這兩種方法分別是使用 Spring Loaded和使用spring-boot-devtools進行熱部署。 熱部署是什麽

Bootstrap入門:Less

樣式 ttr edi local 編輯 修改文件 方便 code b2c 很多時候我們需要定制Bootstrap的樣式,然後根制入門初步中,每次都定制網頁(http://v3.bootcss.com/customize/)生成我們需要的Css是一件很麻煩又不方便的事件。幸好

leaflet入門使用GeoJSON創建矢量圖形

onf ack type play coo bus blog content roc 點對象: function g(feature, layer) { // does this feature have a property named popu

Spring入門之IoC

使用 bsp martin 需要 容器 nbsp 依賴註入 tin 這就是 一、IoC定義   IoC,即控制反轉。開發者在使用類的實例之前,需要先創建對象的實例。但是IoC將創建實例的任務交給IoC容器,這樣開發應用代碼時只需要直接使用類的實例,這就是IoC。在討論控制反

吉他入門

入門 strong 一個 nbsp 吉他 音符 str bsp ron 節奏1: | 0 0 0 0| 每個0代表四分音符,以四分音符為一拍每小節一拍 節奏2 | _0_ _0_ _0_ _0_ _0_. _

HTML入門後臺系統顯示頁面_框架標簽

row http head span 技術分享 target html top logs <!DOCTYPE html> <html> <head> <meta charset="UTF-8">