1. 程式人生 > 其它 >flink core 流處理,批處理

flink core 流處理,批處理

流處理

package com.shujia.flink.core
//匯入隱式轉換
import org.apache.flink.streaming.api.scala._

object Demo1StreamWordCount {
  def main(args: Array[String]): Unit = {

    /**
      * 構建flink環境
      *
      */
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    
//設定並行度 //並行度由資料量決定 // env.setParallelism(3) //讀取socket,構建DS //nc -lk 8888 val lineDS: DataStream[String] = env.socketTextStream("master", 8888) //1、將單詞切分 val wordDS: DataStream[String] = lineDS.flatMap(_.split(",")) //2、轉換成kv格式 val kvDS: DataStream[(String, Int)] = wordDS.map((_, 1))
//3、按照key進行分組,底層也是hash分割槽 keyBy會產生shuffle val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1) //4、對value進行聚合 //sum 可以指定列名,也可指定下標 val countDS: DataStream[(String, Int)] = keyByDS.sum(1) //列印結果 countDS.print() //啟動flink程式 env.execute() } }

批處理

package com.shujia.flink.core

import org.apache.flink.api.scala._ import org.apache.flink.core.fs.FileSystem.WriteMode object Demo2BatchWordCount { def main(args: Array[String]): Unit = { //建立flink batch環境 val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment ///1、讀取資料 //DataSet 相當於rdd val linesDS: DataSet[String] = env.readTextFile("data/words.txt") //將單詞拆分 val countDS: AggregateDataSet[(String, Int)] = linesDS .flatMap(_.split(",")) .map((_, 1)) .groupBy(0) .sum(1) // countDS.print() //儲存資料 countDS.writeAsText("data/count", WriteMode.OVERWRITE) //啟動 env.execute() /** * 批處理:如果列印結果,不需要啟動,因為裡面封裝了 * * 如果儲存結果,需要啟動 * */ } }