Apache Flink -Streaming(DataStream API)
阿新 • • 發佈:2018-08-07
... type .get ast cal @override 定期 例如 collector
綜述:
- 在Flink中DataStream程序是在數據流上實現了轉換的常規程序。
1.示範程序
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class WindowWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> dataStream = env .socketTextStream("localhost", 9999) .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1); dataStream.print(); env.execute("Window WordCount"); }public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word: sentence.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
nc -lk 9999
2.數據源
- 程序從源讀取輸入。可以通過StreamExecutionEnvironment.addSource(sourceFunction)給程序附上源。
- 在StreamExecutionEnvironment中有一些可訪問的預定義的流數據源: readTextFile(path) 逐行作為字符串讀取文本文件 readFile(fileInputFormat, path) 通過指定的文件輸入格式(the specified file input format)讀取文件 readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) 這是一個被前兩個方法內部調用的方法。它基於給定fileInputFormat在path下讀取文件,根據提供的
watchType,這個源會定期監測(每 interval ms)新數據的路徑。
- 基於套接字的 socketTextStream 從套接字讀取。元素可以由一個分隔符分開。
- 基於集合的 fromCollection(Collection) 從Java Java.util.Collection創建一個數據流,集合中的所有元素必須是相同類型的。 fromCollection(Iterator, Class) 從一個叠代器創建一個數據流,類指定叠代器返回的元素的數據類型。 fromElements(T ...) 從給定的對象的序列創建一個數據流,所有對象必須是相同類型的。 fromParallelCollection(SplittableIterator, Class) 在並行執行中,從一個叠代器創建一個數據流,類指定叠代器返回的元素的數據類型。 generateSequence(from, to) 在給定的時間間隔內,生成的數字序列,並行執行。
- 自定義的 addSource 附上一個新的源函數。例如要從Apache Kafka讀取,可以用addSource(new FlinkKafkaConsumer08<>(...))。
3.DataStream Transformations 參照運算符。
4.Data Sinks 數據接收
Apache Flink -Streaming(DataStream API)