Python如何把Spark資料寫入ElasticSearch
這裡以將Apache的日誌寫入到ElasticSearch為例,來演示一下如何使用Python將Spark資料匯入到ES中。
實際工作中,由於資料與使用框架或技術的複雜性,資料的寫入變得比較複雜,在這裡我們簡單演示一下。
如果使用Scala或Java的話,Spark提供自帶了支援寫入ES的支援庫,但Python不支援。所以首先你需要去這裡下載依賴的ES官方開發的依賴包包。
下載完成後,放在本地目錄,以下面命令方式啟動pyspark:
pyspark --jars elasticsearch-hadoop-6.4.1.jar
如果你想pyspark使用Python3,請設定環境變數:
export PYSPARK_PYTHON=/usr/bin/python3
{ "id: { the rest of your json}}
往下會展示如何轉換成這種格式。
解析Apache日誌檔案
我們將Apache的日誌檔案讀入,構建Spark RDD。然後我們寫一個parse()函式用正則表示式處理每條日誌,提取我們需要的字
rdd = sc.textFile("/home/ubuntu/walker/apache_logs")
regex='^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'
p=re.compile(regex) def parse(str): s=p.match(str) d = {} d['ip']=s.group(1) d['date']=s.group(4) d['operation']=s.group(5) d['uri']=s.group(6) return d
換句話說,我們剛開始從日誌檔案讀入RDD的資料類似如下:
['83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML,like Gecko) Chrome/32.0.1700.77 Safari/537.36"']
然後我們使用map函式轉換每條記錄:
rdd2 = rdd.map(parse)
rdd2.take(1)
[{'date': '17/May/2015:10:05:03 +0000','ip': '83.149.9.216','operation': 'GET','uri': '/presentations/logstash-monitorama-2013/images/kibana-search.png'}]
現在看起來像JSON,但並不是JSON字串,我們需要使用json.dumps將dict物件轉換。
我們同時增加一個doc_id欄位作為整個JSON的ID。在配置ES中我們增加如下配置“es.mapping.id”: “doc_id”告訴ES我們將這個欄位作為ID。
這裡我們使用SHA演算法,將這個JSON字串作為引數,得到一個唯一ID。
計算結果類似如下,可以看到ID是一個很長的SHA數值。
rdd3.take(1)
[('a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c','{"date": "17/May/2015:10:05:03 +0000","ip": "83.149.9.216","operation": "GET","doc_id": "a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c","uri": "/presentations/logstash-monitorama-2013/images/kibana-search.png"}')]
現在我們需要制定ES配置,比較重要的兩項是:
- “es.resource” : ‘walker/apache': "walker"是索引,apache是型別,兩者一般合稱索引
- “es.mapping.id”: “doc_id”: 告訴ES那個欄位作為整個文件的ID,也就是查詢結果中的_id
其他的配置自己去探索。
然後我們使用saveAsNewAPIHadoopFile()將RDD寫入到ES。這部分程式碼對於所有的ES都是一樣的,比較固定,不需要理解每一個細節
es_write_conf = { "es.nodes" : "localhost","es.port" : "9200","es.resource" : 'walker/apache',"es.input.json": "yes","es.mapping.id": "doc_id" } rdd3.saveAsNewAPIHadoopFile( path='-',outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",keyClass="org.apache.hadoop.io.NullWritable",valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",conf=es_write_conf) rdd3 = rdd2.map(addID) def addId(data): j=json.dumps(data).encode('ascii','ignore') data['doc_id'] = hashlib.sha224(j).hexdigest() return (data['doc_id'],json.dumps(data))
最後我們可以使用curl進行查詢
curl http://localhost:9200s/walker/apache/_search?pretty=true&?q=* { "_index" : "walker","_type" : "apache","_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2","_score" : 1.0,"_source" : { "date" : "17/May/2015:10:05:32 +0000","ip" : "91.177.205.119","operation" : "GET","doc_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2","uri" : "/favicon.ico" }
如下是所有程式碼:
import json import hashlib import re def addId(data): j=json.dumps(data).encode('ascii',json.dumps(data)) def parse(str): s=p.match(str) d = {} d['ip']=s.group(1) d['date']=s.group(4) d['operation']=s.group(5) d['uri']=s.group(6) return d regex='^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$' p=re.compile(regex) rdd = sc.textFile("/home/ubuntu/walker/apache_logs") rdd2 = rdd.map(parse) rdd3 = rdd2.map(addID) es_write_conf = { "es.nodes" : "localhost","es.mapping.id": "doc_id" } rdd3.saveAsNewAPIHadoopFile( path='-',conf=es_write_conf)
也可以這麼封裝,其實原理是一樣的
import hashlib import json from pyspark import Sparkcontext def make_md5(line): md5_obj=hashlib.md5() md5_obj.encode(line) return md5_obj.hexdigest() def parse(line): dic={} l = line.split('\t') doc_id=make_md5(line) dic['name']=l[1] dic['age'] =l[2] dic['doc_id']=doc_id return dic #記得這邊返回的是字典型別的,在寫入es之前要記得dumps def saveData2es(pdd,es_host,port,index,index_type,key): """ 把saprk的執行結果寫入es :param pdd: 一個rdd型別的資料 :param es_host: 要寫es的ip :param index: 要寫入資料的索引 :param index_type: 索引的型別 :param key: 指定文件的id,就是要以文件的那個欄位作為_id :return: """ #例項es客戶端記得單例模式 if es.exist.index(index): es.index.create(index,'spo') es_write_conf = { "es.nodes": es_host,"es.port": port,"es.resource": index/index_type,"es.mapping.id": key } (pdd.map(lambda _dic: ('',json.dumps(_dic)))) #這百年是為把這個資料構造成元組格式,如果傳進來的_dic是字典則需要jdumps,如果傳進來之前就已經dumps,這便就不需要dumps了 .saveAsNewAPIHadoopFile( path='-',conf=es_write_conf) ) if __name__ == '__main__': #例項化sp物件 sc=Sparkcontext() #檔案中的呢內容一行一行用sc的讀取出來 json_text=sc.textFile('./1.txt') #進行轉換 json_data=json_text.map(lambda line:parse(line)) saveData2es(json_data,'127.0.01','9200','index_test','index_type','doc_id') sc.stop()
看到了把,面那個例子在寫入es之前加了一個id,返回一個元組格式的,現在這個封裝指定_id就會比較靈活了
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。