1. 程式人生 > 實用技巧 >Flink(三)【核心程式設計】

Flink(三)【核心程式設計】

目錄

和其他所有的計算框架一樣,flink也有一些基礎的開發步驟以及基礎,核心的API,當前Java版本,從開發步驟的角度來講,主要分為四大部分

一.Environment

批處理

// 批處理環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

流處理

// 流式資料處理環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

二.Source

從集合讀取資料

import java.util.Arrays;
import java.util.List;

/**
 * @description:
 * @author: HaoWu
 * @create: 2020年09月16日
 */
public class Flink01_Source_Collection {
    public static void main(String[] args) throws Exception {
        // 0.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        List<Integer> list = Arrays.asList(1, 2, 3, 4);
        // 1.Source:讀取資料
        DataStreamSource<Integer> listDS = env.fromCollection(list);
        // 2.列印
        listDS.print();
        // 3.執行
        env.execute();
    }
}

從檔案讀取資料

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @description:
 * @author: HaoWu
 * @create: 2020年09月16日
 */
public class Flink01_Source_File {
    public static void main(String[] args) throws Exception {
        // 0.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.Source:讀取資料
        DataStreamSource<String> fileDS = env.readTextFile("D:\\SoftWare\\idea-2019.2.3\\wordspace\\13_flinkdemo\\input");
        // 2.列印
        fileDS.print();
        // 3.執行
        env.execute();
    }
}

從kakfa讀取資料(常用)

官網:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html

依賴

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>1.10.0</version>
</dependency>

注意:根據官網說明的flink和kafka版本的適配關係,選用對應得依賴。

java程式碼

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;

/**
 * @description: 從kafka讀取資料
 * @author: HaoWu
 * @create: 2020年09月16日
 */
public class Flink03_Source_Kafka {
    public static void main(String[] args) throws Exception {
        // 0.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.Source:讀取資料
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "hadoop102:9092");
        props.setProperty("group.id", "consumer-group");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("auto.offset.reset", "latest");

        DataStreamSource<String> kafkaDS = env.addSource(
                new FlinkKafkaConsumer011<>(
                        "flink-test",
                        new SimpleStringSchema(),
                        props));
        // 2.列印
        kafkaDS.print();
        // 3.執行
        env.execute();
    }
}

自定義資料來源

可以用來造資料,測試程式

pojo

public class WaterSensor {

    private String id;
    private Long ts;
    private Integer vc;
    ......
}

Java程式碼

import com.flink.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;

/**
 * @description: 自定義資料來源
 * @author: HaoWu
 * @create: 2020年09月16日
 */
public class Flink04_Source_CustomMySource {
    public static void main(String[] args) throws Exception {
        // 0.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.Source:讀取資料
        DataStreamSource<WaterSensor> mySourceDS = env.addSource(new MySourceFuntion());
        // 2.列印
        mySourceDS.print();
        // 3.執行
        env.execute();
    }

    /**
     * 自定義資料來源
     * 1. 實現 SourceFunction,指定輸出的型別
     * 2. 重寫 兩個方法:  run() ,cancel()
     */
    private static class MySourceFuntion implements SourceFunction<WaterSensor> {
        // 定義一個標誌位,控制資料的產生
        private Boolean flag = true;

        @Override
        public void run(SourceContext<WaterSensor> ctx) throws Exception {
            Random random = new Random();
            while (flag) {
                //構建bean物件
                WaterSensor waterSensor = new WaterSensor(
                        "sensor" + random.nextInt(3),
                        System.currentTimeMillis(),
                        random.nextInt(10) + 40
                );
                Thread.sleep(2000);
                ctx.collect(waterSensor);
            }
        }

        @Override
        public void cancel() {
            this.flag = false;
        }
    }
}

三.Transform

map

對映:將資料流中的資料進行轉換, 形成新的資料流,消費一個元素併產出一個元素

引數:Scala匿名函式或MapFunction

返回:DataStream

需求:將以下日誌轉為WaterSensor實體類

sensor-data.log

sensor_1,1549044122,1
sensor_1,1549044123,2
sensor_1,1549044124,3
sensor_2,1549044125,4

WaterSensor.java

public class WaterSensor {

    private String id;
    private Long ts;
    private Integer vc;
    ......
}

Java程式碼

import com.flink.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @description:
 * @author: HaoWu
 * @create: 2020年09月16日
 */
public class Flink05_Transform_Map {
    public static void main(String[] args) throws Exception {
        // 0.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.Source:讀取資料
        DataStreamSource<String> fileDS = env.readTextFile("D:\\SoftWare\\idea-2019.2.3\\wordspace\\13_flinkdemo\\input\\sensor-data.log");
        // 2.Transform: Map轉換為實體類
        SingleOutputStreamOperator<WaterSensor> mapDS = fileDS.map(new MyMapFunction());
        // 3.列印
        mapDS.print();
        // 4.執行
        env.execute();
    }

    /**
     * 實現MapFunction,指定輸入的型別,返回的型別
     * 重寫 map方法
     */
    private static class MyMapFunction implements MapFunction<String, WaterSensor> {
        @Override
        public WaterSensor map(String s) throws Exception {
            String[] data = s.split(",");
            return new WaterSensor(
                    data[0],
                    Long.valueOf(data[1]),
                    Integer.valueOf(data[2])

            );
        }
    }
}

Rich版本函式

Flink函式類都有其Rich版本,它與常規函式的不同在於,可以獲取執行環境的上下文,並擁有一些生命週期方法,所以可以實現更復雜的功能。也有意味著提供了更多的,更豐富的功能。例如:RichMapFunction

import com.flink.bean.WaterSensor;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @description:
 * @author: HaoWu
 * @create: 2020年09月16日
 */
public class Flink06_Transform_RichMap {
    public static void main(String[] args) throws Exception {
        // 0.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 全域性並行度設為 1
        env.setParallelism(1);
        // 1.Source:讀取資料
        DataStreamSource<String> fileDS = env.readTextFile("D:\\SoftWare\\idea-2019.2.3\\wordspace\\13_flinkdemo\\input\\sensor-data.log");
        // 2.Transform : RichMap實現轉換
        SingleOutputStreamOperator<WaterSensor> richMapDS = fileDS.map(new MyRichMap());
        // 3.列印
        richMapDS.print();
        // 4.執行
        env.execute();
    }

    /**
     * 繼承 RichMapFunction,指定輸入的型別,返回的型別
     * 提供了 open()和 close() 生命週期管理方法
     * 能夠獲取 執行時上下文物件 =》 可以獲取 狀態、任務資訊 等環境資訊
     */
    private static class MyRichMap extends RichMapFunction<String, WaterSensor> {
        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("open~~~");
        }

        @Override
        public WaterSensor map(String s) throws Exception {
            String[] data = s.split(",");
            RuntimeContext context = getRuntimeContext();
            String taskName = context.getTaskName();
            return new WaterSensor(
                    "taskName:"+taskName+"||"+data[0],
                    Long.valueOf(data[1]),
                    Integer.valueOf(data[2])
            );
        }

        @Override
        public void close() throws Exception {
            System.out.println("open~~~");
        }
    }
}

Rich Function有一個生命週期的概念。典型的生命週期方法有:

  1. open()方法是rich function的初始化方法,當一個運算元例如map或者filter被調 用之前open()會被呼叫

  2. close()方法是生命週期中的最後一個呼叫的方法,做一些清理工作

  3. getRuntimeContext()方法提供了函式的RuntimeContext的一些資訊,例如函式執行 的並行度,任務的名字,以及state狀態

flatMap

扁平對映:將資料流中的整體拆分成一個一個的個體使用,消費一個元素併產生零到多個元素

引數:Scala匿名函式或FlatMapFunction

返回:DataStream

需求:將集合中的集合資料乘以10.

list = {{1,2,3,4},{6,7,8,9}}
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.List;

/**
 * @description:
 * @author: HaoWu
 * @create: 2020年09月16日
 */
public class Flink07_Transform_flatMap {
    public static void main(String[] args) throws Exception {
        // 0.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.Source:讀取資料
        DataStreamSource<List<Integer>> listDS = env.fromCollection(
                Arrays.asList(
                        Arrays.asList(1, 2, 3, 4),
                        Arrays.asList(5, 6, 7, 8)
                ));
        // 2.Transform: 扁平化
        SingleOutputStreamOperator<Integer> flaMapDS = listDS.flatMap(
                new FlatMapFunction<List<Integer>, Integer>() {
                    @Override
                    public void flatMap(List<Integer> integers, Collector<Integer> collector) throws Exception {
                        for (Integer integer : integers) {
                            collector.collect(integer * 10);
                        }
                    }
                }
        );
        // 3.列印
        flaMapDS.print();
        // 4.執行
        env.execute();
    }
}

keyBy

分流:根據指定的Key的hashcode將元素髮送到不同的分割槽,相同的Key會被分到一個分割槽(這裡分割槽指的就是下游運算元多個並行節點的其中一個)。keyBy()是通過雜湊來分割槽的

引數:Scala匿名函式或POJO屬性或元組索引,不能使用陣列

注意

1.建議實現 KeySelector的方式指定分組的key。位置索引 或 欄位名稱 ,返回 Key的型別,無法確定返回 Tuple,使用麻煩

2.和spark處理不一樣,Spark是直接聚合成list集合(key,List(value1,value2...)),Flink是給元素打標籤,相同標籤是同一組

返回:KeyedStream

java程式碼

import com.flink.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @description:
 * @author: HaoWu
 * @create: 2020年09月16日
 */
public class Flink08_Transform_KeyBy {
    public static void main(String[] args) throws Exception {
        // 0.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        // 1.Source:讀取資料
        DataStreamSource<String> fileDS = env.readTextFile("D:\\SoftWare\\idea-2019.2.3\\wordspace\\13_flinkdemo\\input\\sensor-data.log");
        fileDS.print("File:");
        // 2.Transform: Map轉換為實體類
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = fileDS.map(
                new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String s) throws Exception {
                        String[] data = s.split(",");
                        return new WaterSensor(
                                data[0],
                                Long.valueOf(data[1]),
                                Integer.valueOf(data[2])
                        );
                    }
                }
        );

        // 3.分組
        // 通過 位置索引 或 欄位名稱 ,返回 Key的型別,無法確定,所以會返回 Tuple,後續使用key的時候,很麻煩
        // 通過 明確的指定 key 的方式, 獲取到的 key就是具體的型別 => 實現 KeySelector 或 lambda
        KeyedStream<WaterSensor, String> waterSensorGroupDS = waterSensorDS.keyBy(
                new KeySelector<WaterSensor, String>() {
                    @Override
                    public String getKey(WaterSensor waterSensor) throws Exception {
                        return waterSensor.getId();
                    }
                }
        );
        // 4.列印
        waterSensorGroupDS.print("KeyBy");
        // 5.執行
        env.execute();
    }
}

注意

通過 位置索引 或 欄位名稱 ,返回 Key的型別,無法確定,所以會返回 Tuple,後續使用key的時候,很麻煩
通過 明確的指定 key 的方式, 獲取到的 key就是具體的型別 => 實現 KeySelector 或 lambda

filter

過濾:根據指定的規則將滿足條件(true)的資料保留,不滿足條件(false)的資料丟棄

引數:Scala匿名函式或FilterFunction

返回:DataStream

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;

/**
 * @description:
 * @author: HaoWu
 * @create: 2020年09月16日
 */
public class Flink09_Transform_Filter {
    public static void main(String[] args) throws Exception {
        // 0.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 1.Source:讀取資料
        DataStreamSource<Integer> listDS = env.fromCollection(
                Arrays.asList(1, 2, 3, 4, 5, 6, 7));
        // 2.過濾:只保留偶數
        SingleOutputStreamOperator<Integer> filterDS = listDS.filter(
                new FilterFunction<Integer>() {
                    @Override
                    public boolean filter(Integer integer) throws Exception {
                        return integer % 2 == 0;
                    }
                }
        );
        // 2.列印
        filterDS.print();
        // 3.執行
        env.execute();
    }
}

shuffle

打亂重組(洗牌):將資料按照均勻分佈打散到下游

引數:無

返回:DataStream

split + select

split:根據資料特徵給資料打標籤; select:根據標籤取出目標資料

import com.flink.bean.WaterSensor;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;

/**
 * @description: TODO Split + Select
 * @author: HaoWu
 * @create: 2020年09月18日
 */
public class Flink11_Tranform_Split {
    public static void main(String[] args) throws Exception {
        // 0.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 全域性並行度設為 1
        env.setParallelism(2);
        // 1.Source:讀取資料
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.readTextFile("13_flinkdemo/input/sensor-data.log")
                .map(new Flink05_Transform_Map.MyMapFunction());
        SplitStream<WaterSensor> splitDS = sensorDS
                // TODO Split: 水位低於 50 正常,水位 [50,80) 警告, 水位高於 80 告警
                .split(
                        new OutputSelector<WaterSensor>() {
                            @Override
                            public Iterable<String> select(WaterSensor value) {
                                if (value.getVc() < 40) {
                                    return Arrays.asList("normal");
                                } else if (value.getVc() < 60) {
                                    return Arrays.asList("warn");
                                } else {
                                    return Arrays.asList("alert");
                                }
                            }
                        });
        //取出 normal
        splitDS.select("normal").print("normal");
        //取出 warn
        splitDS.select("warn").print("warn");
        //取出 alert
        splitDS.select("alert").print("alert");

        // 3.執行
        env.execute();
    }
}

connect

可以不同型別流連線,同床異夢; 只能兩條流,資料分開處理

將兩個不同來源的資料流進行連線,實現資料匹配,比如訂單支付和第三方交易資訊,這兩個資訊的資料就來自於不同資料來源,連線後,將訂單支付和第三方交易資訊進行對賬,此時,才能算真正的支付完成。

Flink中的connect運算元可以連線兩個保持他們型別的資料流,兩個資料流被Connect之後,只是被放在了一個同一個流中,內部依然保持各自的資料和形式不發生任何變化,兩個流相互獨立。

java程式碼

import com.flink.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import java.util.Arrays;

/**
 * @description: TODO Connect:可以不同型別流連線,同床異夢;  只能兩條流,資料分開處理
 *
 * @author: HaoWu
 * @create: 2020年09月18日
 */
public class Flink12_Tranform_Connect {
    public static void main(String[] args) throws Exception {
        // 0.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 全域性並行度設為 1
        env.setParallelism(2);
        // 第1條流
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.readTextFile("13_flinkdemo/input/sensor-data.log")
                .map(new Flink05_Transform_Map.MyMapFunction());
        // 第2條流
        DataStreamSource<Integer> listDS = env.fromCollection(Arrays.asList(1, 2, 3, 4, 2, 5, 6, 7));

        // TODO Connect
        ConnectedStreams<WaterSensor, Integer> connectDS = sensorDS.connect(listDS);
        // 做map轉換 ,實現CoMapFunction
        SingleOutputStreamOperator<Object> resultDS = connectDS.map(new CoMapFunction<WaterSensor, Integer, Object>() {
            @Override
            public Object map1(WaterSensor value) throws Exception {
                return value.toString();
            }

            @Override
            public Object map2(Integer value) throws Exception {
                return value + 10;
            }
        });
        // 列印
        resultDS.print();

        // 3.執行
        env.execute();
    }
}

union

對兩個或者兩個以上的流合為一體,流型別要相同,返回一個DataStream包含所有元素。

/**
 * @description: TODO:1.流型別必須相同,合為一體 ;2.可以連線多個流,資料一起處理
 *
 * @author: HaoWu
 * @create: 2020年09月18日
 */
public class Flink13_Tranform_Union {
    public static void main(String[] args) throws Exception {
        // 0.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 全域性並行度設為 1
        env.setParallelism(2);
        // 第1條流
        DataStreamSource<Integer> listDS1 = env.fromCollection(Arrays.asList(1, 2, 3, 4));
        // 第2條流
        DataStreamSource<Integer> listDS2 = env.fromCollection(Arrays.asList( 5, 6, 7,8));
        // 第3條流
        DataStreamSource<Integer> listDS3 = env.fromCollection(Arrays.asList( 5, 6, 7,8));

        // TODO Union
        DataStream<Integer> resultDS = listDS1.union(listDS2, listDS3);
        // 列印
        resultDS.print();
        // 3.執行
        env.execute();
    }
}

connect 和 union 的區別

1)  union之前兩個流的型別必須是一樣,connect可以不一樣
2)  connect只能操作兩個流,union可以操作多個。

四.Operator

keyBy對資料進行分流後,可以對資料進行相應的統計分析

滾動聚合運算元(Rolling Aggregation): sum、max、min

KeyedStream的每一個支流做聚合, 然後將聚合結果合併成1個DataStream

對每個key分組後,組內的聚合,然後將聚合結果返回。

注意:非key和非聚合的欄位,對分組和聚合不影響

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @description: TODO 對KeyStream每個流單獨聚合,將聚合結果合併,返回DataSream
 * @author: HaoWu
 * @create: 2020年09月18日
 */
public class Flink14_Tranform_RollingAgg {
    public static void main(String[] args) throws Exception {
        // 0.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.讀取資料
        env.socketTextStream("hadoop102", 9999)
                // 轉換:string -> Tuple3(id,ts,vc)
                .map(new MapFunction<String, Tuple3<String, Long, Integer>>() {
                    @Override
                    public Tuple3<String, Long, Integer> map(String s) throws Exception {
                        String[] datas = s.split(",");
                        return new Tuple3.of(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
                    }
                })
                // 按照id分組
                .keyBy(tuple3 -> tuple3.f0)
                //求和:sum
                //最大值:max
                //求最小值
                .min(2).print(); // 列印
        
        // 2.執行
        env.execute();
    }
}

測試

輸入資料

[root@hadoop102 ~]$ nc -lk 9999
sensor_1,1549044122,10
sensor_1,1549044122,5     
sensor_2,1549044123,20                  
sensor_1,1549044126,50
sensor_1,1549044128,70
sensor_1,1549044126111,2

實時輸出

5> (sensor_1,1549044122,10)
5> (sensor_1,1549044122,5)
2> (sensor_2,1549044123,20)
5> (sensor_1,1549044122,5)
5> (sensor_1,1549044122,5)
5> (sensor_1,1549044122,2)

reduce

分組後,對組內的資料兩兩做歸約操作

注意:每組的第一個元素不進行reduce操作,直接輸出,當第二個元素到來才會歸約。

/**
 * @description: TODO 一個分組資料流的聚合操作,合併當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果
 * @author: HaoWu
 * @create: 2020年09月18日
 */
public class Flink15_Tranform_Reduce {
    public static void main(String[] args) throws Exception {
        // 0.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.讀取資料
        env.socketTextStream("hadoop102", 9999)
                // 轉換:string -> Tuple3(id,ts,vc)
                .map(new MapFunction<String, Tuple3<String, Long, Integer>>() {
                    @Override
                    public Tuple3<String, Long, Integer> map(String s) throws Exception {
                        String[] datas = s.split(",");
                        return Tuple3.of(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
                    }
                })
                // 按照id分組
                .keyBy(tuple3 -> tuple3.f0)
                // 分組內聚合
                .reduce(new ReduceFunction<Tuple3<String, Long, Integer>>() {
                            @Override
                            public Tuple3<String, Long, Integer> reduce(Tuple3<String, Long, Integer> t0, 
                                                                        Tuple3<String, Long, Integer> t1) throws Exception {
                                return Tuple3.of(t0.f0, 123L, t0.f2 + t1.f2);
                            }
                        }

                ).print();

        // 2.執行
        env.execute();
    }
}

process

keyBy進行分流處理後,如果想要處理過程中獲取環境相關資訊,可以採用process運算元自定義實現

/**
 * @description: TODO keyBy進行分流處理後,如果想要處理過程中獲取環境相關資訊
 * @author: HaoWu
 * @create: 2020年09月18日
 */
public class Flink16_Tranform_Process {
    public static void main(String[] args) throws Exception {
        // 0.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.讀取資料
        env.socketTextStream("hadoop102", 9999)
                // 轉換:string -> Tuple3(id,ts,vc)
                .map(new MapFunction<String, Tuple3<String, Long, Integer>>() {
                    @Override
                    public Tuple3<String, Long, Integer> map(String s) throws Exception {
                        String[] datas = s.split(",");
                        return Tuple3.of(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
                    }
                })
                // 按照id分組
                .keyBy(tuple3 -> tuple3.f0)
                // 獲取環境資訊
                .process(
                        new KeyedProcessFunction<String, Tuple3<String, Long, Integer>, String>() {
                            @Override
                            public void processElement(Tuple3<String, Long, Integer> value, Context ctx, Collector<String> out) throws Exception {
                                String currentKey = ctx.getCurrentKey();
                                Long timestamp = ctx.timestamp();
                                out.collect("當前資料的key:"+currentKey+",當前的時間戳:"+timestamp+",資料:"+value.toString());
                            }
                        }
                ).print();//列印
        // 2.執行
        env.execute();
    }
}

測試

輸入資料

sensor_1,1549044122,5
sensor_2,1549044123,20

輸出資料

5> 當前資料的key:sensor_1,當前的時間戳:null,資料:(sensor_1,1549044122,5)
2> 當前資料的key:sensor_2,當前的時間戳:null,資料:(sensor_2,1549044123,20)

五.Sink

Kafka Sink

官網:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html

將處理完的資料傳送到Kafka訊息佇列中

依賴

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>1.10.0</version>
</dependency>

注意:根據官網說明的flink和kafka版本的適配關係,選用對應得依賴。

java程式碼

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

/**
 * @description: 輸出到 Kafka
 * @author: HaoWu
 * @create: 2020年09月18日
 */
public class Flink17_Sink_Kafka {
    public static void main(String[] args) throws Exception {
        // 0.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.讀取socket流
        env.socketTextStream("hadoop102", 9999)
                //寫入kafka
                .addSink(new FlinkKafkaProducer011(
                        "hadoop102:9092,hadoop103:9092,hadoop104:9092",
                        "flink-test2",
                        new SimpleStringSchema())
                );
        // 2.執行
        env.execute();
    }
}

Redis Sink

處理完的資料傳送到Redis快取資料庫

依賴

        <!-- redis依賴 -->
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>

java程式碼

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

/**
 * @description: 輸出到 Redis
 * @author: HaoWu
 * @create: 2020年09月18日
 */
public class Flink18_Sink_Redis {
    public static void main(String[] args) throws Exception {
        // 0.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.讀取socket流
        env.socketTextStream("hadoop102", 9999)
                //寫入Redis
                .addSink(new RedisSink<String>(
                        new FlinkJedisPoolConfig.Builder()
                                .setHost("hadoop102")
                                .setPort(6379)
                                .build(),
                        new RedisMapper<String>() {
                            // redis 的命令: key是最外層的 key
                            @Override
                            public RedisCommandDescription getCommandDescription() {
                                return new RedisCommandDescription(RedisCommand.HSET, "sensor0421");
                            }

                            // Hash型別:這個指定的是 hash 的key
                            @Override
                            public String getKeyFromData(String data) {
                                String[] datas = data.split(",");
                                return datas[1];
                            }

                            // Hash型別:這個指定的是 hash 的 value
                            @Override
                            public String getValueFromData(String data) {
                                String[] datas = data.split(",");
                                return datas[2];
                            }
                        }
                ));
        // 2.執行
        env.execute();
    }
}

ElasticSearch Sink

依賴

  <!-- ES依賴 -->
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
      <version>1.10.0</version>
  </dependency>

java程式碼

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

/**
 * @description: 輸出到 ES
 * @author: HaoWu
 * @create: 2020年09月18日
 */
public class Flink19_Sink_ES {
    public static void main(String[] args) throws Exception {
        // 0.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.讀取socket流
        DataStreamSource<String> socktDS = env.readTextFile("13_flinkdemo/input/sensor-data.log");
        // 2.寫入ES
        // 2.1建立ES Sink
        ElasticsearchSink<String> esBuildSink = new ElasticsearchSink.Builder<>(
                Arrays.asList(
                        new HttpHost("hadoop102", 9200),
                        new HttpHost("hadoop103", 9200),
                        new HttpHost("hadoop104", 9200)
                ),
                new ElasticsearchSinkFunction<String>() {
                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        // 將資料放在Map中
                        Map<String, String> dataMap = new HashMap<>();
                        dataMap.put("data", element);
                        // 建立 IndexRequest =》 指定index,指定type,指定source
                        IndexRequest indexRequest = Requests.indexRequest("sensor").type("_doc").source(dataMap);
                        // 新增到 RequestIndexer
                        indexer.add(indexRequest);
                    }
                }
        ).build();
        //寫入ES
        socktDS.addSink(esBuildSink);
        // 2.執行
        env.execute();
    }
}

自定義Sink(寫mysql)

如果Flink沒有提供給我們可以直接使用的聯結器,那我們如果想將資料儲存到我們自己的儲存裝置中,怎麼辦?沒事,Flink提供了自定義Sink,你自己決定如何進行儲存。

依賴

        <!-- mysql連線 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
        </dependency>

java程式碼

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * @description: 自定義Sink輸出到Mysql
 * @author: HaoWu
 * @create: 2020年09月18日
 */
public class Flink20_Sink_CustomMySQLSink {
    public static void main(String[] args) throws Exception {
        // 0.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.讀取資料
        DataStreamSource<String> inputDS = env.socketTextStream("hadoop102", 9999);

        // 2.寫入Mysql
        inputDS.addSink(new RichSinkFunction<String>() {

                            private Connection conn = null;
                            private PreparedStatement pstmt = null;

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                conn = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test", "root", "root");
                                pstmt = conn.prepareStatement("INSERT INTO sensor VALUES (?,?,?)");
                            }

                            /**
                             * 關流
                             * @throws Exception
                             */
                            @Override
                            public void close() throws Exception {
                                pstmt.close();
                                conn.close();
                            }

                            /**
                             * 填充佔位符
                             * @param value 輸入資料
                             * @param context 上下文
                             * @throws Exception
                             */
                            @Override
                            public void invoke(String value, Context context) throws Exception {
                                String[] datas = value.split(",");
                                pstmt.setString(1, datas[0]);
                                pstmt.setLong(2, Long.valueOf(datas[1]));
                                pstmt.setInt(3, Integer.valueOf(datas[2]));
                                pstmt.execute();
                            }
                        }
        );
        // 3.執行
        env.execute();
    }
}