Flink DataStreamAPI與DataSetAPI應用案例實戰-Flink牛刀小試
版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:[email protected],如有任何學術交流,可隨時聯絡。
1 DataStreamAPI
1.1 DataStream Data Sources
-
source是程式的資料來源輸入,你可以通過StreamExecutionEnvironment.addSource(sourceFunction)來為你的程式新增一個source。
-
flink提供了大量的已經實現好的source方法,可以自定義source 通過實現sourceFunction介面來自定義無並行度的source。
1 使用並行度為1的source public class MyNoParalleSource implements SourceFunction<Long>{ private long count = 1L; private boolean isRunning = true; /** * 主要的方法 * 啟動一個source * 大部分情況下,都需要在這個run方法中實現一個迴圈,這樣就可以迴圈產生資料了 * * @param ctx * @throws Exception */ @Override public void run(SourceContext<Long> ctx) throws Exception { while(isRunning){ ctx.collect(count); count++; //每秒產生一條資料 Thread.sleep(1000); } } * 取消一個cancel的時候會呼叫的方法 @Override public void cancel() { isRunning = false; } } 2 Main方法執行 public class StreamingDemoWithMyNoPralalleSource { public static void main(String[] args) throws Exception { //獲取Flink的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //獲取資料來源 DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1); //注意:針對此source,並行度只能設定為1 DataStream<Long> num = text.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("接收到資料:" + value); return value; } }); //每2秒鐘處理一次資料 DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0); //列印結果 sum.print().setParallelism(1); String jobName = StreamingDemoWithMyNoPralalleSource.class.getSimpleName(); env.execute(jobName); } } 複製程式碼
-
可以通過實現ParallelSourceFunction介面或者繼承RichParallelSourceFunction來自定義有並行度的source。繼承RichParallelSourceFunction的那些SourceFunction意味著它們都是並行執行的並且可能有一些資源需要open/close
public class MyParalleSource implements ParallelSourceFunction<Long> { private long count = 1L; private boolean isRunning = true; /** * 主要的方法 * 啟動一個source * 大部分情況下,都需要在這個run方法中實現一個迴圈,這樣就可以迴圈產生資料了 * * @param ctx * @throws Exception */ @Override public void run(SourceContext<Long> ctx) throws Exception { while(isRunning){ ctx.collect(count); count++; //每秒產生一條資料 Thread.sleep(1000); } } /** * 取消一個cancel的時候會呼叫的方法 * */ @Override public void cancel() { isRunning = false; } } public class StreamingDemoWithMyPralalleSource { public static void main(String[] args) throws Exception { //獲取Flink的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //獲取資料來源 DataStreamSource<Long> text = env.addSource(new MyParalleSource()).setParallelism(2); DataStream<Long> num = text.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("接收到資料:" + value); return value; } }); //每2秒鐘處理一次資料 DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0); //列印結果 sum.print().setParallelism(1); String jobName = StreamingDemoWithMyPralalleSource.class.getSimpleName(); env.execute(jobName); } } public class MyRichParalleSource extends RichParallelSourceFunction<Long> { private long count = 1L; private boolean isRunning = true; /** * 主要的方法 * 啟動一個source * 大部分情況下,都需要在這個run方法中實現一個迴圈,這樣就可以迴圈產生資料了 * @param ctx * @throws Exception */ @Override public void run(SourceContext<Long> ctx) throws Exception { while(isRunning){ ctx.collect(count); count++; //每秒產生一條資料 Thread.sleep(1000); } } /** * 取消一個cancel的時候會呼叫的方法 * */ @Override public void cancel() { isRunning = false; } /** * 這個方法只會在最開始的時候被呼叫一次 * 實現獲取連結的程式碼 * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { System.out.println("open............."); super.open(parameters); } /** * 實現關閉連結的程式碼 * @throws Exception */ @Override public void close() throws Exception { super.close(); } } public class StreamingDemoWithMyRichPralalleSource { public static void main(String[] args) throws Exception { //獲取Flink的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //獲取資料來源 DataStreamSource<Long> text = env.addSource(new MyRichParalleSource()).setParallelism(2); DataStream<Long> num = text.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("接收到資料:" + value); return value; } }); //每2秒鐘處理一次資料 DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0); //列印結果 sum.print().setParallelism(1); String jobName = StreamingDemoWithMyRichPralalleSource.class.getSimpleName(); env.execute(jobName); } } 複製程式碼
-
基於檔案 readTextFile(path) 讀取文字檔案,檔案遵循TextInputFormat 讀取規則,逐行讀取並返回。
-
基於socket socketTextStream從socker中讀取資料,元素可以通過一個分隔符切開。
public class SocketDemoFullCount { public static void main(String[] args) throws Exception{ //獲取需要的埠號 int port; try { ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port"); }catch (Exception e){ System.err.println("No port set. use default port 9010--java"); port = 9010; } //獲取flink的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String hostname = "SparkMaster"; String delimiter = "\n"; //連線socket獲取輸入的資料 DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter); DataStream<Tuple2<Integer,Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer,Integer>>() { @Override public Tuple2<Integer,Integer> map(String value) throws Exception { return new Tuple2<>(1,Integer.parseInt(value)); } }); intData.keyBy(0) .timeWindow(Time.seconds(5)) .process(new ProcessWindowFunction<Tuple2<Integer,Integer>, String, Tuple, TimeWindow>() { @Override public void process(Tuple key, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<String> out) throws Exception { System.out.println("執行process"); long count = 0; for(Tuple2<Integer,Integer> element: elements){ count++; } out.collect("window:"+context.window()+",count:"+count); } }).print(); //這一行程式碼一定要實現,否則程式不執行 env.execute("Socket window count"); } } 複製程式碼
-
基於集合 fromCollection(Collection) 通過java 的collection集合建立一個數據流,集合中的所有元素必須是相同型別的。
public class StreamingFromCollection { public static void main(String[] args) throws Exception { //獲取Flink的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ArrayList<Integer> data = new ArrayList<>(); data.add(10); data.add(15); data.add(20); //指定資料來源 DataStreamSource<Integer> collectionData = env.fromCollection(data); //通map對資料進行處理 DataStream<Integer> num = collectionData.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return value + 1; } }); //直接列印 num.print().setParallelism(1); env.execute("StreamingFromCollection"); } 複製程式碼
}
-
自定義輸入 addSource 可以實現讀取第三方資料來源的資料 系統內建提供了一批connectors,聯結器會提供對應的source支援【kafka】
1.2 DataStream Transformations
-
map:輸入一個元素,然後返回一個元素,中間可以做一些清洗轉換等操作
-
flatmap:輸入一個元素,可以返回零個,一個或者多個元素
-
keyBy:根據指定的key進行分組,相同key的資料會進入同一個分割槽
dataStream.keyBy("someKey") // 指定物件中的 "someKey"欄位作為分組key dataStream.keyBy(0) //指定Tuple中的第一個元素作為分組key 注意:以下型別是無法作為key的 1:一個實體類物件,沒有重寫hashCode方法,並且依賴object的hashCode方法 2:一個任意形式的陣列型別 3:基本資料型別,int,long 複製程式碼
-
filter:過濾函式,對傳入的資料進行判斷,符合條件的資料會被留下。
public class StreamingDemoFilter { public static void main(String[] args) throws Exception { //獲取Flink的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //獲取資料來源 DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,並行度只能設定為1 DataStream<Long> num = text.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("原始接收到資料:" + value); return value; } }); //執行filter過濾,滿足條件的資料會被留下 DataStream<Long> filterData = num.filter(new FilterFunction<Long>() { //把所有的奇數過濾掉 @Override public boolean filter(Long value) throws Exception { return value % 2 == 0; } }); DataStream<Long> resultData = filterData.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("過濾之後的資料:" + value); return value; } }); //每2秒鐘處理一次資料 DataStream<Long> sum = resultData.timeWindowAll(Time.seconds(2)).sum(0); //列印結果 sum.print().setParallelism(1); String jobName = StreamingDemoFilter.class.getSimpleName(); env.execute(jobName); } } 複製程式碼
-
reduce:對資料進行聚合操作,結合當前元素和上一次reduce返回的值進行聚合操作,然後返回一個新的值
-
aggregations:sum(),min(),max()等
-
window:在後面單獨詳解
-
Union:合併多個流,新的流會包含所有流中的資料,但是union是一個限制,就是所有合併的流型別必須是一致的。
public class StreamingDemoUnion { public static void main(String[] args) throws Exception { //獲取Flink的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //獲取資料來源 DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,並行度只能設定為1 DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1); //把text1和text2組裝到一起 DataStream<Long> text = text1.union(text2); DataStream<Long> num = text.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("原始接收到資料:" + value); return value; } }); //每2秒鐘處理一次資料 DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0); //列印結果 sum.print().setParallelism(1); String jobName = StreamingDemoUnion.class.getSimpleName(); env.execute(jobName); } 複製程式碼
}
-
Connect:和union類似,但是隻能連線兩個流,兩個流的資料型別可以不同,會對兩個流中的資料應用不同的處理方法。
-
CoMap, CoFlatMap:在ConnectedStreams中需要使用這種函式,類似於map和flatmap
public class StreamingDemoConnect { public static void main(String[] args) throws Exception { //獲取Flink的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //獲取資料來源 DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,並行度只能設定為1 DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1); SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() { @Override public String map(Long value) throws Exception { return "str_" + value; } }); ConnectedStreams<Long, String> connectStream = text1.connect(text2_str); SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() { @Override public Object map1(Long value) throws Exception { return value; } @Override public Object map2(String value) throws Exception { return value; } }); //列印結果 result.print().setParallelism(1); String jobName = StreamingDemoConnect.class.getSimpleName(); env.execute(jobName); } } 複製程式碼
-
Split:根據規則把一個數據流切分為多個流:
-
Select:和split配合使用,選擇切分後的流
public class StreamingDemoSplit { public static void main(String[] args) throws Exception { //獲取Flink的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //獲取資料來源 DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,並行度只能設定為1 //對流進行切分,按照資料的奇偶性進行區分 SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() { @Override public Iterable<String> select(Long value) { ArrayList<String> outPut = new ArrayList<>(); if (value % 2 == 0) { outPut.add("even");//偶數 } else { outPut.add("odd");//奇數 } return outPut; } }); //選擇一個或者多個切分後的流 DataStream<Long> evenStream = splitStream.select("even"); DataStream<Long> oddStream = splitStream.select("odd"); DataStream<Long> moreStream = splitStream.select("odd","even"); //列印結果 moreStream.print().setParallelism(1); String jobName = StreamingDemoSplit.class.getSimpleName(); env.execute(jobName); } } 複製程式碼
1.3 DataStream API之partition
-
Random partitioning:隨機分割槽
dataStream.shuffle()
-
Rebalancing:對資料集進行再平衡,重分割槽,消除資料傾斜
dataStream.rebalance()
-
Rescaling:如果上游操作有2個併發,而下游操作有4個併發,那麼上游的一個併發結果分配給下游的兩個併發操作,另外的一個併發結果分配給了下游的另外兩個併發操作。另一方面,下游有兩個併發操作而上游又4個併發操作,那麼上游的其中兩個操作的結果分配給下游的一個併發操作而另外兩個併發操作的結果則分配給另外一個併發操作。
-
Rescaling與Rebalancing的區別: Rebalancing會產生全量重分割槽,而Rescaling不會。、
dataStream.rescale()
-
Custom partitioning:自定義分割槽需要實現Partitioner介面
public class MyPartition implements Partitioner<Long> { @Override public int partition(Long key, int numPartitions) { System.out.println("分割槽總數:"+numPartitions); if(key % 2 == 0){ return 0; }else{ return 1; } } } public class SteamingDemoWithMyParitition { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()); //對資料進行轉換,把long型別轉成tuple1型別 DataStream<Tuple1<Long>> tupleData = text.map(new MapFunction<Long, Tuple1<Long>>() { @Override public Tuple1<Long> map(Long value) throws Exception { return new Tuple1<>(value); } }); //分割槽之後的資料 DataStream<Tuple1<Long>> partitionData= tupleData.partitionCustom(new MyPartition(), 0); DataStream<Long> result = partitionData.map(new MapFunction<Tuple1<Long>, Long>() { @Override public Long map(Tuple1<Long> value) throws Exception { System.out.println("當前執行緒id:" + Thread.currentThread().getId() + ",value: " + value); return value.getField(0); } }); result.print().setParallelism(1); env.execute("SteamingDemoWithMyParitition"); } } 複製程式碼
-
dataStream.partitionCustom(partitioner, "someKey") 或者dataStream.partitionCustom(partitioner, 0);
-
Broadcasting:在後面單獨詳解