Flink 入門實戰之Sink elasticSearch7
阿新 • • 發佈:2020-12-21
技術標籤:Flink入門實戰系列flinkkafkaelasticsearches
- 讀取Kafka資料
- 轉化為樣例類
- sink es
package com.skin
import java.util.Properties
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org. apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.http.HttpHost
import org.elasticsearch.action. index.IndexRequest
import org.elasticsearch.client.Requests
object SinkES {
//使用者資訊樣例類
case class User(id: String, name: String, sex:String, age:Int, ts:Long)
def main(args: Array[String]): Unit = {
// 1、獲取流式環境變數
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2、從kafka讀取資料
val properties = new Properties()
properties.setProperty("bootstrap.servers", "jeff200:9092")
val kfkConsumer = new FlinkKafkaConsumer[String](
"test1",
new SimpleStringSchema(),
properties
)
//從頭開始讀取
kfkConsumer.setStartFromEarliest()
val dataStream = env.addSource(kfkConsumer)
.map(data=>{
val arr = data.split(",")
User(arr(0), arr(1), arr(2), arr(3).toInt, arr(4).toLong)
})
// 3、sink es
val httpHosts = new java.util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("jeff200", 9200, "http"))
val esSinkBuilder = new ElasticsearchSink.Builder[User](
httpHosts,
new ElasticsearchSinkFunction[User] {
def process(element: User, ctx: RuntimeContext, indexer: RequestIndexer) {
val json = new java.util.HashMap[String, Any]
json.put("uid", element.id)
json.put("name", element.name)
val rqst: IndexRequest = Requests.indexRequest
.index("user-index")
.source(json)
indexer.add(rqst)
}
}
)
esSinkBuilder.setBulkFlushMaxActions(1)
dataStream.addSink(esSinkBuilder.build)
// 4、執行任務
env.execute("SinkES Job")
}
}
- 核心依賴
<!-- es connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>${flink.version}</version>
<scope>${scope}</scope>
</dependency>
<!-- 可選,為了解決生產環境日誌衝突 -->
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
-
注意
-
由於es依賴多,還有日誌、http版本衝突的問題,生產環境需要耐心引入jar包,如下圖
-
執行結果
GET user-index/_search
{
"query": {
"match_all": {}
}
}
//返回
{
"took" : 14,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 3,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "user-index",
"_type" : "_doc",
"_id" : "tA_5eHYBkydVxCEiuGOp",
"_score" : 1.0,
"_source" : {
"uid" : "1",
"name" : "男"
}
},
{
"_index" : "user-index",
"_type" : "_doc",
"_id" : "tQ_5eHYBkydVxCEiuWNc",
"_score" : 1.0,
"_source" : {
"uid" : "2",
"name" : "女"
}
},
{
"_index" : "user-index",
"_type" : "_doc",
"_id" : "tg_5eHYBkydVxCEiuWOD",
"_score" : 1.0,
"_source" : {
"uid" : "3",
"name" : "女"
}
}
]
}
}