1. 程式人生 > >Flink單節點安裝及執行筆記

Flink單節點安裝及執行筆記

Flink 一種大資料計算引擎,和其他計算引擎不同的是,它同時支援流處理和批處理的特點; 那麼首先介紹下,這兩點的概念。流處理,想象成水流,長江之水,自西而向東流,終匯入大海,源遠流長。我們把它類比到處理資料上,那麼可以這麼理解,資料來源源不斷地產生,無界限;批處理,想象成一湖水,天然形成,靜態,類比到處理資料上,就像處理靜態資料集,那麼這個資料集就是有界限;

學習Flink的目的,一方面是針對業務產生的日誌,可以就其做成監控系統,對軟體業務執行過程就行監測,提高解決問題的效率;其次一方面,結合AI領域和大資料領域技術學習,可以應對未來更多結合型產品的業務開發和實現提供更好的解決思路和方案。

如圖,https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/libs/ml/quickstart.html 很好的提供了AI領域相關處理的第三方類庫,對於結合學習還是大有裨益。

  • 安裝flink   因為我用的是mac 所以在有brew情況下, 直接 terminal 命令: brew install flink  我本機安裝完成後的目錄位置如下

  • **啟動單節點flink  ** 切換到目錄libexec下,輸入: ./start-cluster.sh 即可,然後訪問: 

    http://localhost:8081  ,看到如下介面:

  • 第三步,編寫flink處理的job,我用的是JAVA,程式碼依賴如下:

    package com.flink.demo;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    /**
     * @author fxl
     * @version 1.0.0
     * @createTime 2019年07月17日 17:23:00
     * @Description
     */
    public class App {
    
        public static void main(String[] args) throws Exception {
            //引數檢查
            if (args.length != 2) {
                System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
                return;
            }
    
            String hostname = args[0];
            Integer port = Integer.parseInt(args[1]);
    
    
            // set up the streaming execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //獲取資料
            DataStreamSource<String> stream = env.socketTextStream(hostname, port);
    
            //計數
            SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
                    .keyBy(0)
                    .sum(1);
    
            sum.print();
    
            env.execute("Java WordCount from SocketTextStream Example");
        }
    
        public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
                String[] tokens = s.toLowerCase().split("\\W+");
    
                for (String token : tokens) {
                    if (token.length() > 0) {
                        collector.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }
    
    
    }
    
    

    maven依賴如下:

    <?xml version="1.0" encoding="UTF-8"?>
    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.flink.demo</groupId>
        <artifactId>flinkDemo</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <name>flinkDemo</name>
    
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.11</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>1.8.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.12</artifactId>
                <version>1.8.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.8.1</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.8.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-examples</artifactId>
                <version>1.8.1</version>
                <type>pom</type>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.8</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>23.0</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    

第四步,切換到flink版本資料夾下bin,執行命令: flink run -c com.flink.demo.App *.jar ip port  
         -c   指定程式入口類 我的執行類是: com.flink.demo.App
         ip    本機IP地址 127.0.0.1
         port 埠號為job啟動的指定埠號
         完整命令如下:

        fxl-2:bin fxl$ flink run -c com.flink.demo.App /Users/fxl/IdeaProjects/flinkDemo/target/flinkDemo-1.0-SNAPSHOT.jar 127.0.0.1 9008

        操作完成後,使用命令: nc -l 9008  監聽job程式埠,後面需要輸入測試資料入口

        

   第二張圖便是統計輸入單詞的一個