使用Spark、Ansj分詞進行詞頻統計
阿新 • • 發佈:2019-01-30
使用Spark進行詞頻統計
最近在做一個專案,要對大量的文字進行詞頻統計,發現常規的方法處理比較慢,所以嘗試使用Spark進行計算。思路大致是這樣:爬蟲爬取京東的評論資料到mongodb,然後我從mongodb拉資料上傳到HDFS,從HDFS拉資料然後用Spark進行詞頻統計。程式碼如下:
package com.aawant.respository.util;
import java.io.BufferedReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util .Map;
import org.ansj.domain.Term;
import org.ansj.splitWord.analysis.ToAnalysis;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache .spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SQLContext;
import org.springframework .data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import scala.Tuple2;
import com.aawant.mongo.repo.BaseMongoRepository;
import com.aawant.util.StringUtils;
import com.mongodb.BasicDBList;
import com.mongodb.CommandResult;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.MongoClient;
public class JingDongRespository {
public static BaseMongoRepository base = new BaseMongoRepository("test");
/*
* 拉取京東資料
*/
public static List<String> getDistinctResult() {
MongoClient m1 = new MongoClient("192.168.1.63:27017");
@SuppressWarnings("deprecation")
DB db = m1.getDB("test");
DBCollection coll = db.getCollection("commodity");
@SuppressWarnings("rawtypes")
List list = coll.distinct("type");
return list;
// BasicDBList list = (BasicDBList) result.get("values");
// for (int i = 0; i < list.size(); i++) {
// System.out.println(list.get(i));
// }
}
public static List<String> listAllJdId(String typename) {
List<String> idList = new ArrayList<String>();
Query query = null;
if (StringUtils.isNotEmpty(typename)) {
query = Query.query(new Criteria().where("type").is(typename));
}
List<Map> result = base.doQuery(query, Map.class, "commodity");
if (result != null && result.size() > 0) {
for (Map map : result) {
idList.add(map.get("_id").toString());
}
}
return idList;
}
/*
* 根據品牌id獲取評論
*/
public static List<String> getContentById(String j_id) {
Query query = null;
List<String> contentList = new ArrayList<String>();
if (StringUtils.isNotEmpty(j_id)) {
query = Query.query(new Criteria().where("j_id").is(j_id));
}
List<Map> mapResult = base.doQuery(query, Map.class,
"commodity_appraisal");
if (mapResult != null && mapResult.size() > 0) {
for (Map map : mapResult) {
String content = map.get("content").toString();
if (StringUtils.isNotEmpty(content.trim())) {
contentList.add(content);
}
}
}
return contentList;
}
/*
* 根據品牌id獲取問答對
*/
public static List<String> getQaById(String j_id) {
Query query = null;
List<String> sentenceList = new ArrayList<String>();
if (StringUtils.isNotEmpty(j_id)) {
query = Query.query(new Criteria().where("j_id").is(j_id));
}
List<Map> mapList = base.doQuery(query, Map.class, "commodity_qa");
if (mapList != null && mapList.size() > 0) {
for (Map map : mapList) {
String q = map.get("q").toString();
String a = map.get("a").toString();
if (StringUtils.isNotEmpty(q.trim()) && q.trim().length() < 250) {
sentenceList.add(q);
}
if (StringUtils.isNotEmpty(a.trim()) && q.trim().length() < 250) {
sentenceList.add(a);
}
}
}
return sentenceList;
}
/*
* 評論資料上傳到hdfs
*/
public static void sendDataToHdfs() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://192.168.1.81:8020");
FileSystem fs = FileSystem.get(conf);
SparkConf conf1 = new SparkConf().setAppName("test").setMaster("local");
List<String> typeNameList = JingDongRespository.getDistinctResult();
if (typeNameList != null && typeNameList.size() > 0) {
for (String typename : typeNameList) {
List<String> contentList = new ArrayList<String>();
List<String> idList = JingDongRespository.listAllJdId(typename);
for (String j_id : idList) {
List<String> typeSenList = JingDongRespository.getContentById(j_id);
contentList.addAll(typeSenList);
}
for (String str : contentList){
StringUtils.string2File(str + "\r\n", typename + ".txt", "utf-8", true);
}
System.out.print("ok");
System.out.print("./" + typename + ".txt");
fs.copyFromLocalFile(new Path("./" + typename + ".txt"), new Path("/" + typename + ".txt"));
JavaSparkContext sc = new JavaSparkContext(conf1);
String url = "hdfs://192.168.1.81:8020/" + typename + ".txt";
JavaRDD<String> lines = sc.textFile(url);
SQLContext sqlContext = new SQLContext(sc);
List<String> list = lines.collect();
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) {
List<String> list = new ArrayList<String>();
List<Term> terms = ToAnalysis.parse(s);
for (Term term : terms) {
list.add(term.getName());
}
return list;
}
});
JavaPairRDD<String, Integer> ones = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?,?> tuple : output) {
StringUtils.string2File(tuple._1().toString() + ": " + tuple._2().toString() + "\r\n", "E:/分詞/data/" + typename + ".txt", "utf-8", true);
System.out.println(tuple._1() + ": " + tuple._2());
}
// logger.info("the end time is: " + System.currentTimeMillis());
long endTime = System.currentTimeMillis();
// long useTime = (endTime - startTime)/1000;
// logger.info("use time: " + useTime);
sc.stop();
}
}
}
public static void TestFile() throws IOException {
BufferedReader br = StringUtils.getBr("./筆記本.txt", "utf-8");
String line = "";
while ((line = br.readLine()) != null) {
System.out.print(line);
}
}
public static void main(String args[]) throws IOException {
// JingDongRespository.getDistinctResult();
JingDongRespository.sendDataToHdfs();
// JingDongRespository.TestFile();
}
}