1. 程式人生 > 其它 >python訪問elasticsearch_關於spark dataframe資料匯入elasticsearch

python訪問elasticsearch_關於spark dataframe資料匯入elasticsearch

技術標籤:python訪問elasticsearch

首先elasticsearch是什麼?

elasticsearch 的功能如其名字,是彈性搜尋資料庫。簡單來說就是一個搜尋引擎,你可以把它理解為一個小百度。spark是大資料框架,或者說工具。當然elasticsearch也是支援分散式的。

我們有很多格式化的資料是存在分散式的叢集上也就是hdfs上的,用spark能夠將這些資料匯出出來。

elasticsearch 5以後有了一個工具包elasticsearch-hadoop 支援hadoop與elasticsearch的資料互動。如果不用這個工具的話,就得通過elasticsearch自己的介面或者http請求來進行資料的轉存。相對麻煩,也不夠優雅。

有了elasticsearch-hadoop以後,就能夠很方便的將資料從hadoop匯入到elasticsearch上。

為了完成這一目標,首先我們需要

elasticsearch-hadoop-6.1.1.zip這個包,下載地址在這裡

對大資料的即時洞察 | Elastic​www.elastic.co 1b34abcd7d273ff1ed5750a8744116fa.png

如果使用jupyter的話,需要加上這個配置

import os
# set environment variable PYSPARK_SUBMIT_ARGS
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars elasticsearch-hadoop-6.1.1/dist/elasticsearch-spark-20_2.11-6.1.1.jar pyspark-shell'

接下來,我們需要配置好spark調取dataframe的工具,也就是sparksession

from pyspark.sql import SparkSession
sparkSession = SparkSession 
    .builder 
    .appName("es_books") 
    .master("local") 
    .enableHiveSupport() 
    .getOrCreate()

接下來就是提數啦

df = sparkSession.sql()

然後我們需要把df轉換成為elasticsearch適合的儲存格式

import json
def format_data(row):
    return (row['id'], row['content'])
transed_data = df.rdd.map(format_data)

當然es配置也是必要的

es_write_conf = {

# specify the node that we are sending data to (this should be the master)
"es.nodes" : '127.0.0.1',

# specify the port in case it is not the default port
"es.port" : '9200',

# specify a resource in the form 'index/doc-type'
"es.resource" : 'index/doc_type',

# is the input JSON?
"es.input.json" : "yes",

# is there a field in the mapping that should be used to specify the ES document ID
"es.mapping.id": "id"

}

然後最重要的步驟來了

rdd_transed.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)

然後,你就可以用python訪問es啦

from elasticsearch import Elasticsearch

es = Elasticsearch(["127.0.0.1"])
body = '{"query" : {"match" : {"content" : "hello es"} }}'
# 獲取索引為my_index,文件型別為test_type的所有資料,result為一個字典型別
result = es.search(index="index",doc_type="doc_type", body=body)

本文假定你已經在es裡面設定過了相應type的mapping。