通過Spark程式讀取CSV檔案儲存到ES
阿新 • • 發佈:2019-01-07
0、準備工作:
spark版本:1.6.0
ES版本:5.5.2
JDK版本:1.8
測試資料:Book1.csv
age,sex,no,address,phone,qq,birthday 12,男,1,斯蒂芬斯蒂芬,122,444,2017-1-12 13,男,2,斯蒂芬斯蒂芬,123,445,2017-1-12 14,男,3,斯蒂芬斯蒂芬,124,446,2017-1-12 15,男,4,斯蒂芬斯蒂芬,125,447,2017-1-12 16,男,5,斯蒂芬斯蒂芬,126,448,2017-1-12 17,女,6,斯蒂芬斯蒂芬,127,449,2017-1-12 18,女,7,斯蒂芬斯蒂芬,128,450,2017-1-12 19,女,8,斯蒂芬斯蒂芬,129,451,2017-1-12 20,女,9,斯蒂芬斯蒂芬,130,452,2017-1-12 21,女,10,斯蒂芬斯蒂芬,131,453,2017-1-12 22,女,11,斯蒂芬斯蒂芬,132,454,2017-1-12 23,女,12,斯蒂芬斯蒂芬,133,455,2017-1-12 24,女,13,斯蒂芬斯蒂芬,134,456,2017-1-12 25,女,14,斯蒂芬斯蒂芬,135,457,2017-1-12 26,女,15,斯蒂芬斯蒂芬,136,458,2017-1-12 27,男,16,斯蒂芬斯蒂芬,137,459,2017-1-12 28,男,17,斯蒂芬斯蒂芬,138,460,2017-1-12 29,男,18,斯蒂芬斯蒂芬,139,461,2017-1-12 30,男,19,斯蒂芬斯蒂芬,140,462,2017-1-12
1、pom.xml 配置:
<!--https://mvnrepository.com/artifact/org.apache.spark/spark-core--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>1.6.0</version> </dependency> <!-- https://mvnrepository.com/artifact/com.databricks/spark-csv --> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-csv_2.11</artifactId> <version>1.5.0</version> </dependency> <!--https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-13 --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-13_2.11</artifactId> <version>5.5.2</version> </dependency>
2、CSVToES.java :
import com.databricks.spark.csv.util.TextFile; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.rdd.RDD; import org.elasticsearch.spark.rdd.EsSpark; import scala.Serializable; import java.util.*; public class CSVToES3 implements Serializable { /** * @function 通過 Spark 程式把 CSV 格式的檔案儲存到 ES 中 * @author lw * @date 2018-02-28 */ //csv編碼格式 private String charset="UTF-8"; private List<String> csvSchemaColumns = null; public int running() throws Exception { SparkConf conf =new SparkConf(); conf.setMaster("local"); conf.setAppName("CSVToES"); conf.set("es.index.auto.create", "true"); conf.set("es.nodes", "92.16.8.11"); conf.set("es.port", "9200"); SparkContext sc=new SparkContext(conf); RDD<String> inputRDD = TextFile.withCharset(sc, "D:\\data\\Book1.csv", charset); String header = inputRDD.first(); // 獲取csv的欄位schema資訊 CSVRecord csvRecord = CSVParser.parse(header, CSVFormat.DEFAULT).getRecords().get(0); csvSchemaColumns = new ArrayList<String>(); Iterator<String> iterator = csvRecord.iterator(); for (int i = 0; iterator.hasNext(); i++) { csvSchemaColumns.add(i, iterator.next()); } //按行讀取CSV檔案資料,生成 key,value 對 JavaRDD rdd= inputRDD.toJavaRDD().map(new Function<String, Map<String,String>>() { public Map<String, String> call(String v1) throws Exception { HashMap<String,String> resmap=new HashMap<String,String>(); String[] fields=v1.split(","); for(int i=0;i<csvSchemaColumns.size();i++){ if(!csvSchemaColumns.get(i).equals(fields[i])){ resmap.put(csvSchemaColumns.get(i),fields[i]); }else{ resmap.put("null","null"); } } return resmap; } //過濾掉首行的列名 }).filter(new Function<Map<String, String>, Boolean>() { public Boolean call(Map<String, String> v1) throws Exception { return v1.get("null")!="null"; } }); //寫入到索引為 spark,type為 docs4 下 EsSpark.saveToEs(rdd.rdd(), "spark/docs4"); return 0; } public static void main(String[] args) throws Exception { CSVToES2 csvToES = new CSVToES2(); System.exit(csvToES.running()); } }
spark程式是在本地模式下執行的,測試資料也是在本地,執行完程式後如果安裝了 Kibana 可以在介面檢視結果:
有寫的不清楚或不對的地方歡迎大家批評指正!