ES-Hadoop學習之ES和HDFS資料交換
ES作為強大的搜尋引擎,HDFS是分散式檔案系統。ES可以將自身的Document匯入到HDFS中用作備份,ES也可以將儲存在HDFS上的結構化檔案匯入為ES的中的Document。而ES-Hadoop正是這兩者之間的一個connector
1,將資料從ES匯出到HDFS
1.1,資料準備,在ES中建立Index和Type,並建立document。在我的例子中,Index是mydata,type是person,建立了兩條如下圖所示的document
1.2 在專案中引入ES-Hadoop庫
<dependency> <groupId>org.elasticsearch</groupId><artifactId>elasticsearch-hadoop</artifactId> <version>5.5.2</version> </dependency>
值得注意的是,上面的dependency只會引入ES-Hadoop相關的Jar包,和Hadoop相關的包,例如hadoop-common, hadoop-hdfs等等,依然還需要新增依賴。
1.3,建立從ES到Hadoop的資料遷移的Mapper類
package com.wjm.es_hadoop.example1; import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper; import org.elasticsearch.hadoop.mr.LinkedMapWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; class E2HMapper01 extends Mapper<Text, LinkedMapWritable, Text, LinkedMapWritable> { private static final Logger LOG= LoggerFactory.getLogger(E2HMapper01.class); @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override protected void map(Text key, LinkedMapWritable value, Context context) throws IOException, InterruptedException { LOG.info("key {} value {}", key, value); context.write(key, value); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } }
這個Mapper非常簡單,它並沒有對從ES獲取的資料進行任何的處理,只是寫到了context中。map方法中,引數key的值,就是ES中document的id的值,引數value是一個LinkedMapWritable,它包含的就是一個document的內容。只是在這個mapper中,我們沒有處理document,而是直接輸出。
1.4,建立從ES到Hadoop的資料遷移的Job類
package com.wjm.es_hadoop.example1; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.elasticsearch.hadoop.mr.EsInputFormat; import org.elasticsearch.hadoop.mr.LinkedMapWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class E2HJob01 { private static Logger LOG = LoggerFactory.getLogger(E2HJob01.class); public static void main(String[] args) { try { Configuration conf = new Configuration(); conf.setBoolean("mapred.map.tasks.speculative.execution", false); conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); //ElasticSearch節點 conf.set("es.nodes", "192.168.8.194:9200"); //ElaticSearch Index/Type conf.set("es.resource", "mydata/person/"); if (args.length != 1) { LOG.error("error : " + args.length); System.exit(2); } Job job = Job.getInstance(conf, "JOBE2H01"); job.setJarByClass(E2HJob01.class); job.setInputFormatClass(EsInputFormat.class); job.setMapperClass(E2HMapper01.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LinkedMapWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[0])); System.out.println(job.waitForCompletion(true)); } catch (Exception e) { LOG.error(e.getMessage(), e); } } }
這個Job有兩點需要注意一下:
1,它沒有reducer,因為就是資料的透傳,不需要reduce過程。
2,InputFormatClass被設定為EsInputFormat,正是這個類,負責將從ES讀出的資料,轉換成mapper的輸入引數(Text,LinkedMapWritable)
1.5,打包執行
以下面的命令來啟動MapReduce任務:
hadoop jar es-hadoop-1.0-SNAPSHOT.jar com.wjm.es_hadoop.example1.E2HJob01 hdfs://bigdata-191:8020/wangjinming
執行完這個命令之後,看到/wangjinming目錄下面產生了檔案
檢視其中一個檔案,會發現資料被分為兩列,第一列為id,第二列為document的內容
另外,在執行hadoop jar命令的時候,需要把es-hadoop的jar包放到hadoop jar能訪問到的classpath下面。我查了一些方法都沒成功,最後使用了一個笨方法,用hadoop classpath方法檢視hadoop的classpath有哪些,然後將es-hadoop相關的jar包copy到其中一個目錄下。
2,將資料從HDFS中匯入到ES中。
2.1,資料準備。建立下面的這樣一個檔案並put到hdfs檔案系統中(我放在hdfs://bigdata-191:8020/input/perosn)
{"id":"3", "name":"jerry", "age":"23", "info":"hello hadoop"}
{"id":"4", "name":"russell", "age":"15", "info":"hello elasticsearch"}
2.2,Mapper編寫
package com.wjm.es_hadoop.example1; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; class H2EMapper01 extends Mapper<LongWritable, Text, NullWritable, Text> { @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override public void run(Context context) throws IOException, InterruptedException { super.run(context); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(NullWritable.get(), value); } @Override protected void cleanup(Context context) throws IOException,InterruptedException { super.cleanup(context); } }
這個Mapper也很簡單,只是把從HDFS中讀取到的資料透傳給ES。因為Mapper的input是一個HDFS檔案,所以,mapper的入參跟其他從hdfs多資料的mapper沒有任何區別。寫入到context的是,入參的key值是沒有意義的,所以忽略掉,直接把Text型別的value寫入到context就可以了。
2.3,編寫job
package com.wjm.es_hadoop.example1; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.elasticsearch.hadoop.mr.EsOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class H2EJob01 { private static Logger LOG = LoggerFactory.getLogger(H2EJob01.class); public static void main(String args[]) { try { Configuration conf = new Configuration(); conf.setBoolean("mapred.map.tasks.speculative.execution", false); conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); //ElasticSearch節點 conf.set("es.nodes", "192.168.8.194:9200"); //ElaticSearch Index/Type conf.set("es.resource", "mydata/person/"); //Hadoop上的資料格式為JSON,可以直接匯入 conf.set("es.input.json", "yes"); conf.set("es.mapping.id", "id"); if (args.length != 1) { LOG.error("error : " + args.length); System.exit(2); } Job job = Job.getInstance(conf, "51JOBH2E"); job.setJarByClass(H2EJob01.class); job.setMapperClass(H2EMapper01.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputFormatClass(EsOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); System.out.println(job.waitForCompletion(true)); } catch (Exception e) { LOG.error(e.getMessage(), e); } } }
這個Job有幾個需要注意的地方
es.input.json引數設定為true告訴ES-Hadoop,mapper輸出的結果是一個json格式的Text。
es.mapping.id引數指定json物件中那種field對應的值為es中document的id
OutputFormatClass被設定為EsOutputFormat,正是這個類負責將MapReduce的輸出結果(一個json格式的Text)轉換為ES的ID和document的內容
2.4,執行命令
hadoop jar es-hadoop-1.0-SNAPSHOT.jar com.wjm.es_hadoop.example1.H2EJob01 hdfs://bigdata-191:8020/input/person
命令成功執行之後,可以通過ES的命令看到資料已經在ES中建立了相應的document