1. 程式人生 > >使用Spark、Ansj分詞進行詞頻統計

使用Spark、Ansj分詞進行詞頻統計

使用Spark進行詞頻統計
最近在做一個專案,要對大量的文字進行詞頻統計,發現常規的方法處理比較慢,所以嘗試使用Spark進行計算。思路大致是這樣:爬蟲爬取京東的評論資料到mongodb,然後我從mongodb拉資料上傳到HDFS,從HDFS拉資料然後用Spark進行詞頻統計。程式碼如下:

package com.aawant.respository.util;

import java.io.BufferedReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util
.Map; import org.ansj.domain.Term; import org.ansj.splitWord.analysis.ToAnalysis; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache
.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.SQLContext; import org.springframework
.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import scala.Tuple2; import com.aawant.mongo.repo.BaseMongoRepository; import com.aawant.util.StringUtils; import com.mongodb.BasicDBList; import com.mongodb.CommandResult; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.MongoClient; public class JingDongRespository { public static BaseMongoRepository base = new BaseMongoRepository("test"); /* * 拉取京東資料 */ public static List<String> getDistinctResult() { MongoClient m1 = new MongoClient("192.168.1.63:27017"); @SuppressWarnings("deprecation") DB db = m1.getDB("test"); DBCollection coll = db.getCollection("commodity"); @SuppressWarnings("rawtypes") List list = coll.distinct("type"); return list; // BasicDBList list = (BasicDBList) result.get("values"); // for (int i = 0; i < list.size(); i++) { // System.out.println(list.get(i)); // } } public static List<String> listAllJdId(String typename) { List<String> idList = new ArrayList<String>(); Query query = null; if (StringUtils.isNotEmpty(typename)) { query = Query.query(new Criteria().where("type").is(typename)); } List<Map> result = base.doQuery(query, Map.class, "commodity"); if (result != null && result.size() > 0) { for (Map map : result) { idList.add(map.get("_id").toString()); } } return idList; } /* * 根據品牌id獲取評論 */ public static List<String> getContentById(String j_id) { Query query = null; List<String> contentList = new ArrayList<String>(); if (StringUtils.isNotEmpty(j_id)) { query = Query.query(new Criteria().where("j_id").is(j_id)); } List<Map> mapResult = base.doQuery(query, Map.class, "commodity_appraisal"); if (mapResult != null && mapResult.size() > 0) { for (Map map : mapResult) { String content = map.get("content").toString(); if (StringUtils.isNotEmpty(content.trim())) { contentList.add(content); } } } return contentList; } /* * 根據品牌id獲取問答對 */ public static List<String> getQaById(String j_id) { Query query = null; List<String> sentenceList = new ArrayList<String>(); if (StringUtils.isNotEmpty(j_id)) { query = Query.query(new Criteria().where("j_id").is(j_id)); } List<Map> mapList = base.doQuery(query, Map.class, "commodity_qa"); if (mapList != null && mapList.size() > 0) { for (Map map : mapList) { String q = map.get("q").toString(); String a = map.get("a").toString(); if (StringUtils.isNotEmpty(q.trim()) && q.trim().length() < 250) { sentenceList.add(q); } if (StringUtils.isNotEmpty(a.trim()) && q.trim().length() < 250) { sentenceList.add(a); } } } return sentenceList; } /* * 評論資料上傳到hdfs */ public static void sendDataToHdfs() throws IOException { Configuration conf = new Configuration(); conf.set("fs.default.name","hdfs://192.168.1.81:8020"); FileSystem fs = FileSystem.get(conf); SparkConf conf1 = new SparkConf().setAppName("test").setMaster("local"); List<String> typeNameList = JingDongRespository.getDistinctResult(); if (typeNameList != null && typeNameList.size() > 0) { for (String typename : typeNameList) { List<String> contentList = new ArrayList<String>(); List<String> idList = JingDongRespository.listAllJdId(typename); for (String j_id : idList) { List<String> typeSenList = JingDongRespository.getContentById(j_id); contentList.addAll(typeSenList); } for (String str : contentList){ StringUtils.string2File(str + "\r\n", typename + ".txt", "utf-8", true); } System.out.print("ok"); System.out.print("./" + typename + ".txt"); fs.copyFromLocalFile(new Path("./" + typename + ".txt"), new Path("/" + typename + ".txt")); JavaSparkContext sc = new JavaSparkContext(conf1); String url = "hdfs://192.168.1.81:8020/" + typename + ".txt"; JavaRDD<String> lines = sc.textFile(url); SQLContext sqlContext = new SQLContext(sc); List<String> list = lines.collect(); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String s) { List<String> list = new ArrayList<String>(); List<Term> terms = ToAnalysis.parse(s); for (Term term : terms) { list.add(term.getName()); } return list; } }); JavaPairRDD<String, Integer> ones = words.mapToPair( new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) { return new Tuple2<>(s, 1); } }); JavaPairRDD<String, Integer> counts = ones.reduceByKey( new Function2<Integer, Integer, Integer>() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); List<Tuple2<String, Integer>> output = counts.collect(); for (Tuple2<?,?> tuple : output) { StringUtils.string2File(tuple._1().toString() + ": " + tuple._2().toString() + "\r\n", "E:/分詞/data/" + typename + ".txt", "utf-8", true); System.out.println(tuple._1() + ": " + tuple._2()); } // logger.info("the end time is: " + System.currentTimeMillis()); long endTime = System.currentTimeMillis(); // long useTime = (endTime - startTime)/1000; // logger.info("use time: " + useTime); sc.stop(); } } } public static void TestFile() throws IOException { BufferedReader br = StringUtils.getBr("./筆記本.txt", "utf-8"); String line = ""; while ((line = br.readLine()) != null) { System.out.print(line); } } public static void main(String args[]) throws IOException { // JingDongRespository.getDistinctResult(); JingDongRespository.sendDataToHdfs(); // JingDongRespository.TestFile(); } }