1. 程式人生 > 其它 >Flink 入門實戰之Sink elasticSearch7

Flink 入門實戰之Sink elasticSearch7

技術標籤:Flink入門實戰系列flinkkafkaelasticsearches

  • 讀取Kafka資料
  • 轉化為樣例類
  • sink es
package com.skin

import java.util.Properties

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.
apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.http.HttpHost import org.elasticsearch.action.
index.IndexRequest import org.elasticsearch.client.Requests object SinkES { //使用者資訊樣例類 case class User(id: String, name: String, sex:String, age:Int, ts:Long) def main(args: Array[String]): Unit = { // 1、獲取流式環境變數 val env = StreamExecutionEnvironment.getExecutionEnvironment // 2、從kafka讀取資料
val properties = new Properties() properties.setProperty("bootstrap.servers", "jeff200:9092") val kfkConsumer = new FlinkKafkaConsumer[String]( "test1", new SimpleStringSchema(), properties ) //從頭開始讀取 kfkConsumer.setStartFromEarliest() val dataStream = env.addSource(kfkConsumer) .map(data=>{ val arr = data.split(",") User(arr(0), arr(1), arr(2), arr(3).toInt, arr(4).toLong) }) // 3、sink es val httpHosts = new java.util.ArrayList[HttpHost] httpHosts.add(new HttpHost("jeff200", 9200, "http")) val esSinkBuilder = new ElasticsearchSink.Builder[User]( httpHosts, new ElasticsearchSinkFunction[User] { def process(element: User, ctx: RuntimeContext, indexer: RequestIndexer) { val json = new java.util.HashMap[String, Any] json.put("uid", element.id) json.put("name", element.name) val rqst: IndexRequest = Requests.indexRequest .index("user-index") .source(json) indexer.add(rqst) } } ) esSinkBuilder.setBulkFlushMaxActions(1) dataStream.addSink(esSinkBuilder.build) // 4、執行任務 env.execute("SinkES Job") } }
  • 核心依賴
<!-- es connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>${scope}</scope>
        </dependency>
        <!-- 可選,為了解決生產環境日誌衝突 -->
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>
  • 注意

  • 由於es依賴多,還有日誌、http版本衝突的問題,生產環境需要耐心引入jar包,如下圖
    在這裡插入圖片描述
    在這裡插入圖片描述

  • 執行結果

GET user-index/_search
{
  "query": {
    "match_all": {}
  }
}
//返回
{
  "took" : 14,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 3,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "user-index",
        "_type" : "_doc",
        "_id" : "tA_5eHYBkydVxCEiuGOp",
        "_score" : 1.0,
        "_source" : {
          "uid" : "1",
          "name" : "男"
        }
      },
      {
        "_index" : "user-index",
        "_type" : "_doc",
        "_id" : "tQ_5eHYBkydVxCEiuWNc",
        "_score" : 1.0,
        "_source" : {
          "uid" : "2",
          "name" : "女"
        }
      },
      {
        "_index" : "user-index",
        "_type" : "_doc",
        "_id" : "tg_5eHYBkydVxCEiuWOD",
        "_score" : 1.0,
        "_source" : {
          "uid" : "3",
          "name" : "女"
        }
      }
    ]
  }
}