Spark讀寫Elasticsearch
阿新 • • 發佈:2018-12-22
Spark讀寫Elasticsearch
版本說明
Spark:2.3.1
Elasticsearch: elasticsearch-6.4.0
1 Scala環境下Spark讀寫Elasticsearch
1.1 依賴包
1.1.1 Spark依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</ version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
1.1.2 Elasticeach依賴
<!--elasticsearch-->
<dependency >
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>6.4.0</version>
</dependency>
1.2 RDD讀寫ES
使用RDD讀寫ES,主要是使用了SparkContext()的esRDD()和saveToEs()兩個方法。但是這個兩個方法需要引入es的包之後才有
import org.elasticsearch.spark._
1.2.1 寫資料到ES
在這之前先寫一個case class 用於建立RDD
case class Course(name: String, credit: Int)
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local")
conf.set("es.nodes", "192.168.1.188")
conf.set("es.port", "9200")
conf.set("es.index.auto.create", "true")
val sc = new SparkContext(conf)
//rdd寫es
val courseRdd = sc.makeRDD(Seq(Course("Hadoop", 4), Course("Spark", 3), Course("Kafka", 4)))
courseRdd.saveToEs("/course/rdd")
1.2.2 從ES讀資料
//rdd讀es
val esCourseRdd = sc.esRDD("/course/rdd")
esCourseRdd.foreach(c => {
println(c.toString())
})
/**
* (vNHejmUBdoXqZPoSocAx,Map(name -> Kafka, credit -> 4))
* (u9HejmUBdoXqZPoSocAx,Map(name -> Spark, credit -> 3))
* (utHejmUBdoXqZPoSocAx,Map(name -> Hadoop, credit -> 4))
*/
1.3 DataFrame讀寫ES
如果想使用spark sql讀寫ES同樣需要引入es的包
import org.elasticsearch.spark.sql._
1.3.1 DataFrame寫資料到ES
//dataframe寫es
val df = spark.read.format("csv").option("header", true).option("inferSchema", true).load("hdfs://192.168.1.188:9000/data/Beijing_2017_HourlyPM25_created20170803.csv")
df.select("Year", "Month", "Day", "Hour", "Value").saveToEs("/csv/dataframe")
1.3.2 DataFrame從ES讀資料
//dataframe讀es
val esDf = spark.esDF("/csv/dataframe")
esDf.show()
/**
* +---+----+-----+-----+----+
* |Day|Hour|Month|Value|Year|
* +---+----+-----+-----+----+
* | 1| 0| 1| 505|2017|
* | 1| 2| 1| 466|2017|
* | 1| 14| 1| 596|2017|
* | 1| 17| 1| 522|2017|
* | 1| 21| 1| 452|2017|
* | 2| 1| 1| 466|2017|
* | 2| 7| 1| 93|2017|
* | 2| 8| 1| 27|2017|
* | 2| 9| 1| 17|2017|
* | 2| 13| 1| 251|2017|
* | 2| 16| 1| 251|2017|
* | 3| 2| 1| 341|2017|
* | 3| 8| 1| 365|2017|
* | 3| 9| 1| 361|2017|
* | 3| 21| 1| 542|2017|
* | 3| 22| 1| 548|2017|
* | 4| 3| 1| 590|2017|
* | 4| 6| 1| 482|2017|
* | 4| 17| 1| 323|2017|
* | 4| 22| 1| 369|2017|
* +---+----+-----+-----+----+
*/
1.4 完整程式碼
package com.hollysys.spark.elasticsearch
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
/**
* Created by shirukai on 2018/8/31
* spark 讀寫ES
*/
object EsTest {
def main(args: Array[String]): Unit = {
import org.elasticsearch.spark._
import org.elasticsearch.spark.sql._
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local")
conf.set("es.nodes", "192.168.1.188")
conf.set("es.port", "9200")
conf.set("es.index.auto.create", "true")
val sc = new SparkContext(conf)
val spark = SparkSession.builder.config(conf).getOrCreate()
//rdd寫es
val courseRdd = sc.makeRDD(Seq(Course("Hadoop", 4), Course("Spark", 3), Course("Kafka", 4)))
courseRdd.saveToEs("/course/rdd")
//rdd讀es
val esCourseRdd = sc.esRDD("/course/rdd")
esCourseRdd.foreach(c => {
println(c.toString())
})
/**
* (vNHejmUBdoXqZPoSocAx,Map(name -> Kafka, credit -> 4))
* (u9HejmUBdoXqZPoSocAx,Map(name -> Spark, credit -> 3))
* (utHejmUBdoXqZPoSocAx,Map(name -> Hadoop, credit -> 4))
*/
//dataframe寫es
val df = spark.read.format("csv").option("header", true).option("inferSchema", true).load("hdfs://192.168.1.188:9000/data/Beijing_2017_HourlyPM25_created20170803.csv")
df.select("Year", "Month", "Day", "Hour", "Value").saveToEs("/csv/dataframe")
//dataframe讀es
val esDf = spark.esDF("/csv/dataframe")
esDf.show()
/**
* +---+----+-----+-----+----+
* |Day|Hour|Month|Value|Year|
* +---+----+-----+-----+----+
* | 1| 0| 1| 505|2017|
* | 1| 2| 1| 466|2017|
* | 1| 14| 1| 596|2017|
* | 1| 17| 1| 522|2017|
* | 1| 21| 1| 452|2017|
* | 2| 1| 1| 466|2017|
* | 2| 7| 1| 93|2017|
* | 2| 8| 1| 27|2017|
* | 2| 9| 1| 17|2017|
* | 2| 13| 1| 251|2017|
* | 2| 16| 1| 251|2017|
* | 3| 2| 1| 341|2017|
* | 3| 8| 1| 365|2017|
* | 3| 9| 1| 361|2017|
* | 3| 21| 1| 542|2017|
* | 3| 22| 1| 548|2017|
* | 4| 3| 1| 590|2017|
* | 4| 6| 1| 482|2017|
* | 4| 17| 1| 323|2017|
* | 4| 22| 1| 369|2017|
* +---+----+-----+-----+----+
*/
}
}
case class Course(name: String, credit: Int)
2 pyspark寫資料到Elasticsearch
使用pyspark寫資料到Elasticsearch主要是利用的寫入外部資料來源,需要org.elasticsearch.spark.sql相關的jar包
2.1 下載相關jar包
使用pyspark寫資料到es需要依賴elasticsearch-spark-20_2.11.jar包,可以到maven倉庫下載。
下載地址:http://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-20_2.11/6.4.0
2.2 pyspark中使用
# encoding: utf-8
"""
Created by shirukai on 2018/8/31
"""
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession.builder.appName("pyspark2es").getOrCreate()
data = [
(1, "Kafka"),
(2, "Spark"),
(3, "Hadoop")
]
df = spark.createDataFrame(data, schema=['id', 'name'])
df.write.format("org.elasticsearch.spark.sql").option("es.nodes", "192.168.1.196:9200").mode("overwrite").save(
"pyspark2es/python")
spark.stop()
使用spark-submit提交任務
bin/spark-submit --master local --jars /Users/shirukai/apps/spark-2.3.1/jars/elasticsearch-spark-20_2.11-6.4.0.jar /Users/shirukai/apps/spark-2.3.1/script/pyspark2es.py