1. 程式人生 > >通過Spark程式讀取CSV檔案儲存到ES

通過Spark程式讀取CSV檔案儲存到ES

通過Spark程式讀取CSV檔案儲存到ES

0、準備工作:

spark版本:1.6.0
ES版本:5.5.2
JDK版本:1.8

測試資料:Book1.csv

age,sex,no,address,phone,qq,birthday
12,男,1,斯蒂芬斯蒂芬,122,444,2017-1-12
13,男,2,斯蒂芬斯蒂芬,123,445,2017-1-12
14,男,3,斯蒂芬斯蒂芬,124,446,2017-1-12
15,男,4,斯蒂芬斯蒂芬,125,447,2017-1-12
16,男,5,斯蒂芬斯蒂芬,126,448,2017-1-12
17,女,6,斯蒂芬斯蒂芬,127,449,2017-1-12
18,女,7,斯蒂芬斯蒂芬,128,450,2017-1-12
19,女,8,斯蒂芬斯蒂芬,129,451,2017-1-12
20,女,9,斯蒂芬斯蒂芬,130,452,2017-1-12
21,女,10,斯蒂芬斯蒂芬,131,453,2017-1-12
22,女,11,斯蒂芬斯蒂芬,132,454,2017-1-12
23,女,12,斯蒂芬斯蒂芬,133,455,2017-1-12
24,女,13,斯蒂芬斯蒂芬,134,456,2017-1-12
25,女,14,斯蒂芬斯蒂芬,135,457,2017-1-12
26,女,15,斯蒂芬斯蒂芬,136,458,2017-1-12
27,男,16,斯蒂芬斯蒂芬,137,459,2017-1-12
28,男,17,斯蒂芬斯蒂芬,138,460,2017-1-12
29,男,18,斯蒂芬斯蒂芬,139,461,2017-1-12
30,男,19,斯蒂芬斯蒂芬,140,462,2017-1-12

1、pom.xml 配置:

<!--https://mvnrepository.com/artifact/org.apache.spark/spark-core-->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>1.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.databricks/spark-csv -->
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-csv_2.11</artifactId>
    <version>1.5.0</version>
</dependency>
<!--https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-13 -->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-13_2.11</artifactId>
    <version>5.5.2</version>
</dependency>

2、CSVToES.java :

import com.databricks.spark.csv.util.TextFile;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.RDD;
import org.elasticsearch.spark.rdd.EsSpark;

import scala.Serializable;

import java.util.*;

public class CSVToES3 implements Serializable {

    /**
     * @function 通過 Spark 程式把 CSV 格式的檔案儲存到 ES 中
     * @author lw
     * @date 2018-02-28
     */

    //csv編碼格式
    private String charset="UTF-8";

    private List<String> csvSchemaColumns = null;

    public int running() throws Exception {

        SparkConf conf =new SparkConf();
        conf.setMaster("local");
        conf.setAppName("CSVToES");
        conf.set("es.index.auto.create", "true");
        conf.set("es.nodes", "92.16.8.11");
        conf.set("es.port", "9200");
        SparkContext sc=new SparkContext(conf);

        RDD<String> inputRDD = TextFile.withCharset(sc, "D:\\data\\Book1.csv", charset);

        String header = inputRDD.first();

        // 獲取csv的欄位schema資訊
        CSVRecord csvRecord = CSVParser.parse(header, CSVFormat.DEFAULT).getRecords().get(0);
        csvSchemaColumns = new ArrayList<String>();

        Iterator<String> iterator = csvRecord.iterator();
        for (int i = 0; iterator.hasNext(); i++) {
            csvSchemaColumns.add(i, iterator.next());
        }
        //按行讀取CSV檔案資料,生成 key,value 對
        JavaRDD rdd= inputRDD.toJavaRDD().map(new Function<String, Map<String,String>>() {
            public Map<String, String> call(String v1) throws Exception {
                HashMap<String,String> resmap=new HashMap<String,String>();
                String[] fields=v1.split(",");
                for(int i=0;i<csvSchemaColumns.size();i++){
                    if(!csvSchemaColumns.get(i).equals(fields[i])){
                        resmap.put(csvSchemaColumns.get(i),fields[i]);
                    }else{
                        resmap.put("null","null");
                    }
                }
                return resmap;
            }
            //過濾掉首行的列名
        }).filter(new Function<Map<String, String>, Boolean>() {
            public Boolean call(Map<String, String> v1) throws Exception {
                return v1.get("null")!="null";
            }
        });
      //寫入到索引為 spark,type為 docs4 下 
       EsSpark.saveToEs(rdd.rdd(), "spark/docs4");

        return 0;
    }


    public static void main(String[] args) throws Exception {
        CSVToES2 csvToES = new CSVToES2();
        System.exit(csvToES.running());
    }
}

spark程式是在本地模式下執行的,測試資料也是在本地,執行完程式後如果安裝了 Kibana 可以在介面檢視結果:

有寫的不清楚或不對的地方歡迎大家批評指正!