1. 程式人生 > >Apache Flink -Streaming(DataStream API)

Apache Flink -Streaming(DataStream API)

... type .get ast cal @override 定期 例如 collector

綜述:

  • 在Flink中DataStream程序是在數據流上實現了轉換的常規程序。

1.示範程序

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class WindowWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream
<Tuple2<String, Integer>> dataStream = env .socketTextStream("localhost", 9999) .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1); dataStream.print(); env.execute("Window WordCount"); }
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word: sentence.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
nc -lk 9999

2.數據源

  • 程序從源讀取輸入。可以通過StreamExecutionEnvironment.addSource(sourceFunction)給程序附上源。
  • 在StreamExecutionEnvironment中有一些可訪問的預定義的流數據源: readTextFile(path) 逐行作為字符串讀取文本文件 readFile(fileInputFormat, path) 通過指定的文件輸入格式(the specified file input format)讀取文件 readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) 這是一個被前兩個方法內部調用的方法。它基於給定fileInputFormat在path下讀取文件,根據提供的watchType,這個源會定期監測(每 interval ms)新數據的路徑。
  • 基於套接字的 socketTextStream 從套接字讀取。元素可以由一個分隔符分開。
  • 基於集合的 fromCollection(Collection) 從Java Java.util.Collection創建一個數據流,集合中的所有元素必須是相同類型的。 fromCollection(Iterator, Class) 從一個叠代器創建一個數據流,類指定叠代器返回的元素的數據類型。 fromElements(T ...) 從給定的對象的序列創建一個數據流,所有對象必須是相同類型的。 fromParallelCollection(SplittableIterator, Class) 在並行執行中,從一個叠代器創建一個數據流,類指定叠代器返回的元素的數據類型。 generateSequence(from, to) 在給定的時間間隔內,生成的數字序列,並行執行。
  • 自定義的 addSource 附上一個新的源函數。例如要從Apache Kafka讀取,可以用addSource(new FlinkKafkaConsumer08<>(...))。

3.DataStream Transformations 參照運算符。

4.Data Sinks 數據接收

Apache Flink -Streaming(DataStream API)