spark2.x寫入資料到ElasticSearch5.X叢集
阿新 • • 發佈:2019-02-08
首先說明,到目前為止,我使用過spark1.6寫入資料到ES2.4中,使用很簡單。
當我使用spark1.6寫入到ES5.5的時候,一直不成功。
官網首先就講了
through the dedicated support available since 2.1 or through the Map/Reduce bridge since 2.0. Spark 2.0 is supported in elasticsearch-hadoop since
version 5.0.
在本地測試,發現使用spark2.x可以,就改scala版本為 2.11.8,將maven依賴換成spark2.1.0版本的,反正在本地測試,修改的東西並不多。
修改maven依賴,一定要注意版本的準確。
<!--<dependency>-->
<!--<groupId>org.elasticsearch</groupId>-->
<!--<artifactId>elasticsearch-spark-20_2.11</artifactId>-->
<!--<version>5.5.0</version>-->
<!--</dependency>-->
第一點改變的就是,這裡的用法並不像之前的,使用rdd.saveToEs(。。。)就可以寫入elasticsearch,在spark2.x中,會報錯沒有這個方法。
官方文件中有這個例子,並給出了步驟,這樣操作可以成功寫入,並且寫入的兩列,就是 departure, arrival
像上圖這種,那麼自定義的rdd,如何將內容寫入ES,如下:
package com.bigsun
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark
import org.elasticsearch.spark.rdd.EsSpark
object App{
case class Trip(content: String)
def main(args: Array[String]): Unit = {
println( "Hello World!" )
System.setProperty("hadoop.home.dir", "G:\\hadoop_home")
val spark = SparkSession.builder()
.appName("SparkTest")
.master("local[5]")
.config("es.index.auto.create", "true")
.config("pushdown", "true")
.config("es.nodes", "192.168.2.5")
.config("es.port", "9200")
.config("es.nodes.wan.only", "true")
.getOrCreate()
//從ES中讀取資料
val sparkDF = spark.sqlContext.read.format("org.elasticsearch.spark.sql").load("index/external")
sparkDF.take(10).foreach(println(_))
import spark.implicits._
val data = spark.read.textFile("g:\\mydata\\*")
//寫入到ES,一定要按照這個格式,因為這種格式才帶有元資料資訊,content就是ES中的列名
val rdd = data.rdd.map{
x => Trip(x)
}
EsSpark.saveToEs(rdd, "index/external")
spark.stop()
}
}
一定要將rdd轉換成case class中的格式,按照我的理解,這裡就是指定在ES中的列名。 最後將完整的春江花月夜寫入到ES中。
spark1.x是否可以這樣寫入,還待測試。如果想知道答案,請聯絡我。
最後,網上關於es5.x 和 spark2.x資料真的不是很多。 碰到問題,首先查官網!這是這次的教訓。
圖顯示不出來?就是ES的截圖而已。