1. 程式人生 > >spark2.x寫入資料到ElasticSearch5.X叢集

spark2.x寫入資料到ElasticSearch5.X叢集

首先說明,到目前為止,我使用過spark1.6寫入資料到ES2.4中,使用很簡單。 當我使用spark1.6寫入到ES5.5的時候,一直不成功。 官網首先就講了 through the dedicated support available since 2.1 or through the Map/Reduce bridge since 2.0. Spark 2.0 is supported in elasticsearch-hadoop since version 5.0. 在本地測試,發現使用spark2.x可以,就改scala版本為 2.11.8,將maven依賴換成spark2.1.0版本的,反正在本地測試,修改的東西並不多。 修改maven依賴,一定要注意版本的準確。 <!--<dependency>-->   <!--<groupId>org.elasticsearch</groupId>-->   <!--<artifactId>elasticsearch-spark-20_2.11</artifactId>-->   <!--<version>5.5.0</version>--> <!--</dependency>--> 第一點改變的就是,這裡的用法並不像之前的,使用rdd.saveToEs(。。。)就可以寫入elasticsearch,在spark2.x中,會報錯沒有這個方法。 官方文件中有這個例子,並給出了步驟,這樣操作可以成功寫入,並且寫入的兩列,就是  departure,  arrival  像上圖這種,那麼自定義的rdd,如何將內容寫入ES,如下: package com.bigsun import org.apache.spark.sql.SparkSession import org.elasticsearch.spark import org.elasticsearch.spark.rdd.EsSpark object App{   case class Trip(content: String)   def main(args: Array[String]): Unit = {     println( "Hello World!" )     System.setProperty("hadoop.home.dir", "G:\\hadoop_home")     val spark = SparkSession.builder()       .appName("SparkTest")       .master("local[5]")       .config("es.index.auto.create", "true")       .config("pushdown", "true")       .config("es.nodes", "192.168.2.5")       .config("es.port", "9200")       .config("es.nodes.wan.only", "true")       .getOrCreate()     //從ES中讀取資料     val sparkDF = spark.sqlContext.read.format("org.elasticsearch.spark.sql").load("index/external")     sparkDF.take(10).foreach(println(_))     import spark.implicits._     val data = spark.read.textFile("g:\\mydata\\*")     //寫入到ES,一定要按照這個格式,因為這種格式才帶有元資料資訊,content就是ES中的列名
    val rdd = data.rdd.map{       x => Trip(x)     }     EsSpark.saveToEs(rdd, "index/external")     spark.stop()   } } 一定要將rdd轉換成case class中的格式,按照我的理解,這裡就是指定在ES中的列名。 最後將完整的春江花月夜寫入到ES中。 spark1.x是否可以這樣寫入,還待測試。如果想知道答案,請聯絡我。 最後,網上關於es5.x 和 spark2.x資料真的不是很多。 碰到問題,首先查官網!這是這次的教訓。 圖顯示不出來?就是ES的截圖而已。