Flink(2):Flink的Source源
阿新 • • 發佈:2021-06-20
相關文章連結
1、Flink的Source源之從集合中獲取資料
1 //1.env 2 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 3 4 //2.source 5 // * 1.env.fromElements(可變引數); 6 DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink"); 7 // * 2.env.fromColletion(各種集合); 8 DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink"));9 // * 3.env.generateSequence(開始,結束); 10 DataStream<Long> ds3 = env.generateSequence(1, 10); 11 12 //3.Transformation 13 14 //4.sink 15 ds1.print(); 16 ds2.print(); 17 ds3.print(); 18 19 //5.execute 20 env.execute();
2、Flink的Source源之從集合中獲取資料
//建立Flink流的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.source // * 1.env.readTextFile(本地檔案/HDFS檔案);//壓縮檔案也可以 DataStream<String> ds1 = env.readTextFile("D:\\Project\\IDEA\\bigdata-study-tutorial\\flink-tutorial-java\\src\\main\\data\\input\\words.txt"); DataStream<String> ds2 = env.readTextFile("data/input/dir"); DataStream<String> ds3 = env.readTextFile("hdfs://node1:8020//wordcount/input/words.txt"); DataStream<String> ds4 = env.readTextFile("data/input/wordcount.txt.gz"); //3.Transformation //4.sink ds1.print(); ds2.print(); ds3.print(); ds4.print(); //5.execute env.execute();
3、Flink之自定義Source源
public static void main(String[] args) throws Exception { //##### 建立Flink流的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // env.setParallelism(4); // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 新增自定義source源 DataStreamSource<SensorReading> dataStream = env.addSource(new SensorReadingSource(1, 1000L)); // 列印資料 dataStream.print(); //##### 啟動執行環境 env.execute("SourceDemo02_UDF"); } /** * 溫度感測器的bean類 */ @Data @AllArgsConstructor @NoArgsConstructor public static class SensorReading { private String id; private Long timestamp; private Double temperature; } /** * 自定義SensorReading源 */ public static class SensorReadingSource extends RichSourceFunction<SensorReading> { /** * 設定標識 */ private boolean running = true; private Integer num = 5; private Long interval = 1000L; public SensorReadingSource() { } /** * 根據傳入的 數量 和 時間間隔 建立自定義源 * * @param num 要建立的感測器數量 * @param interval 建立的感測器時間間隔 */ public SensorReadingSource(Integer num, Long interval) { this.num = num; this.interval = interval; } @Override public void run(SourceContext<SensorReading> ctx) throws Exception { // 定義一個隨機物件 Random random = new Random(); // 初始化num個感測器 ArrayList<SensorReading> sensorReadings = new ArrayList<>(); for (int i = 1; i <= num; i++) { String id = "sensor_" + i; long timestamp = System.currentTimeMillis(); double temperature = 60 + random.nextGaussian() * 10; sensorReadings.add(new SensorReading(id, timestamp, temperature)); } // 當running為true時,生成資料 while (running) { long timestamp = System.currentTimeMillis(); for (SensorReading sensorReading : sensorReadings) { sensorReading.setTimestamp(timestamp); sensorReading.setTemperature(sensorReading.getTemperature() + random.nextGaussian()); ctx.collect(sensorReading); } Thread.sleep(interval); } } @Override public void cancel() { running = false; } }