python訪問elasticsearch_關於spark dataframe資料匯入elasticsearch
阿新 • • 發佈:2021-01-08
首先elasticsearch是什麼?
elasticsearch 的功能如其名字,是彈性搜尋資料庫。簡單來說就是一個搜尋引擎,你可以把它理解為一個小百度。spark是大資料框架,或者說工具。當然elasticsearch也是支援分散式的。
我們有很多格式化的資料是存在分散式的叢集上也就是hdfs上的,用spark能夠將這些資料匯出出來。
elasticsearch 5以後有了一個工具包elasticsearch-hadoop 支援hadoop與elasticsearch的資料互動。如果不用這個工具的話,就得通過elasticsearch自己的介面或者http請求來進行資料的轉存。相對麻煩,也不夠優雅。
有了elasticsearch-hadoop以後,就能夠很方便的將資料從hadoop匯入到elasticsearch上。
為了完成這一目標,首先我們需要
elasticsearch-hadoop-6.1.1.zip這個包,下載地址在這裡
對大資料的即時洞察 | Elasticwww.elastic.co如果使用jupyter的話,需要加上這個配置
import os # set environment variable PYSPARK_SUBMIT_ARGS os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars elasticsearch-hadoop-6.1.1/dist/elasticsearch-spark-20_2.11-6.1.1.jar pyspark-shell'
接下來,我們需要配置好spark調取dataframe的工具,也就是sparksession
from pyspark.sql import SparkSession
sparkSession = SparkSession
.builder
.appName("es_books")
.master("local")
.enableHiveSupport()
.getOrCreate()
接下來就是提數啦
df = sparkSession.sql()
然後我們需要把df轉換成為elasticsearch適合的儲存格式
import json
def format_data(row):
return (row['id'], row['content'])
transed_data = df.rdd.map(format_data)
當然es配置也是必要的
es_write_conf = {
# specify the node that we are sending data to (this should be the master)
"es.nodes" : '127.0.0.1',
# specify the port in case it is not the default port
"es.port" : '9200',
# specify a resource in the form 'index/doc-type'
"es.resource" : 'index/doc_type',
# is the input JSON?
"es.input.json" : "yes",
# is there a field in the mapping that should be used to specify the ES document ID
"es.mapping.id": "id"
}
然後最重要的步驟來了
rdd_transed.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
然後,你就可以用python訪問es啦
from elasticsearch import Elasticsearch
es = Elasticsearch(["127.0.0.1"])
body = '{"query" : {"match" : {"content" : "hello es"} }}'
# 獲取索引為my_index,文件型別為test_type的所有資料,result為一個字典型別
result = es.search(index="index",doc_type="doc_type", body=body)
本文假定你已經在es裡面設定過了相應type的mapping。