一個基於Mahout與hadoop的聚類搭建
阿新 • • 發佈:2019-01-29
mahout是基於hadoop的資料探勘工具,因為有了hadoop,所以進行海量資料的挖掘工作顯得更為簡單。但是因為演算法需要支援M/R,所以不是所有常用的資料探勘演算法都會支援。這篇文章會告訴你,如何使用hadoop + mahout搭出一個簡易的聚類工具。
第一步:搭建hadoop平臺。
我使用的是ubuntu 11.04,如果沒有ubuntu的開發環境,就參考我的帖子《Ubuntu 10.10 java 開發環境》
#1 在ubuntu下面建立一個使用者組與使用者
Java程式碼
#2 安裝ssh-server
Java程式碼
#3 驗證ssh通訊
Java程式碼
ssh localhost 後,選擇 yes,如果沒有問題,就可以安裝hadoop了
#4 新增java_home
修改conf/hadoop-env.sh檔案,讓JAVA_HOME指向正確的地址
#5 修改下面的配置
conf/core-site.xml:
Java程式碼
conf/hdfs-site.xml:
Java程式碼
conf/mapred-site.xml:
Java程式碼
#6 Format a new distributed-filesystem:
Java程式碼
#7 Start the hadoop daemons:
Java程式碼
#8 驗證啟動成功沒有
Java程式碼
數一下有沒有6個,沒有的話,刪除logs下面的檔案,然後從#6開始
#9 別慌,先開啟網頁,打不開,等!!!
Java程式碼
第一步搭建hadoop結束
第二步,Mahout的配置
#1 下載Mahout,解壓
#2 .bash_profile裡面設定HADOOP_HOME
#3 mahout/bin/mahout 看看列印結果
第三步,做一個聚類的demo吧
我的聚類是文字 -> lucene index -> mahout -> clustering dumper
可以選擇的是 sequeneceFile -> mahout -> clustering dumper
我直接貼程式碼吧,用的是groovy,可能寫的不好
#1 text -> lucene index
Java程式碼
#2 lucene index -> mahout vector
Java程式碼
#3 mahout vector -> mahout canopy
Java程式碼
#4 mahout canopy -> mahout kmeans
Java程式碼
#5 mahout keamns -> 結果分析
Java程式碼
效果我就不展示了
第一步:搭建hadoop平臺。
我使用的是ubuntu 11.04,如果沒有ubuntu的開發環境,就參考我的帖子《Ubuntu 10.10 java 開發環境》
#1 在ubuntu下面建立一個使用者組與使用者
Java程式碼
- beneo@ubuntu:~$ sudo addgroup hadoop
-
beneo@ubuntu:~$ sudo adduser --ingroup hadoop hduser
#2 安裝ssh-server
Java程式碼
- beneo@ubuntu:~$ sudo apt-get install ssh
- beneo@ubuntu:~$ su - hduser
- hduser@ubuntu:~$ ssh-keygen -t rsa -P ""
- hduser@ubuntu:~$ cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys
#3 驗證ssh通訊
Java程式碼
- hduser@ubuntu:ssh localhost
ssh localhost 後,選擇 yes,如果沒有問題,就可以安裝hadoop了
#4 新增java_home
修改conf/hadoop-env.sh檔案,讓JAVA_HOME指向正確的地址
#5 修改下面的配置
conf/core-site.xml:
Java程式碼
- <configuration>
- <property>
- <name>fs.default.name</name>
- <value>hdfs://localhost:9000</value>
- </property>
- </configuration>
conf/hdfs-site.xml:
Java程式碼
- <configuration>
- <property>
- <name>dfs.replication</name>
- <value>1</value>
- </property>
- </configuration>
conf/mapred-site.xml:
Java程式碼
- <configuration>
- <property>
- <name>mapred.job.tracker</name>
- <value>localhost:9001</value>
- </property>
- </configuration>
#6 Format a new distributed-filesystem:
Java程式碼
- $ bin/hadoop namenode -format
#7 Start the hadoop daemons:
Java程式碼
- $ bin/start-all.sh
#8 驗證啟動成功沒有
Java程式碼
- $ jps
數一下有沒有6個,沒有的話,刪除logs下面的檔案,然後從#6開始
#9 別慌,先開啟網頁,打不開,等!!!
Java程式碼
- NameNode - http://localhost:50070/
- JobTracker - http://localhost:50030/
第一步搭建hadoop結束
第二步,Mahout的配置
#1 下載Mahout,解壓
#2 .bash_profile裡面設定HADOOP_HOME
#3 mahout/bin/mahout 看看列印結果
第三步,做一個聚類的demo吧
我的聚類是文字 -> lucene index -> mahout -> clustering dumper
可以選擇的是 sequeneceFile -> mahout -> clustering dumper
我直接貼程式碼吧,用的是groovy,可能寫的不好
#1 text -> lucene index
Java程式碼
- def assembleDoc = {
- label, content ->
- assert !label.toString().trim().empty
- assert !content.toString().trim().empty
- def doc = new Document()
- doc.add(new Field("label", label, Field.Store.YES, Field.Index.NOT_ANALYZED))
- doc.add(new Field("content", content, Field.Store.NO, Field.Index.ANALYZED, TermVector.YES))
- doc
- }
- def mockContent = {
- def set = []
- new File("""/home/beneo/text.txt""").newReader().eachLine {
- String line ->
- set << line
- }
- set
- }
- def mockExpandoSet = {
- def lst = []
- mockContent()?.each {
- content ->
- // 過濾掉所有非中文字元
- def line = content.replaceAll("[^\u4e00-\u9fa5]+", "")
- if (line != null && line.trim().length() > 2) {
- println(content)
- def expando = new Expando()
- expando.label = content
- expando.content = line
- lst << expando
- }
- }
- lst
- }
- //建立一個dic
- def directory = FSDirectory.open(new File("""/home/beneo/index"""), NoLockFactory.getNoLockFactory())
- // 用的是 IK分詞
- def analyzer = new IKAnalyzer()
- //建立一個indexWriter,這個wirter就是用來產生出index
- def indexWriter = new IndexWriter(directory, analyzer, true, IndexWriter.MaxFieldLength.UNLIMITED)
- //從本地獲得文字
- mockExpandoSet().each {
- expando ->
- indexWriter.addDocument(assembleDoc(expando.label, expando.content))
- }
- indexWriter.commit()
- indexWriter.close()
- directory.close()
#2 lucene index -> mahout vector
Java程式碼
- mahout/bin/mahout lucene.vector -d index/ -i label -o tmp/vector/vector -f content -t tmp/vector/dict -n 2
#3 mahout vector -> mahout canopy
Java程式碼
- mahout/bin/mahout canopy -i tmp/vector/vector -o tmp/canopy/ -dm org.apache.mahout.common.distance.CosineDistanceMeasure -t1 0.32 -t2 0.08 -ow
#4 mahout canopy -> mahout kmeans
Java程式碼
- mahout/bin/mahout kmeans -i tmp/vector/vector -c tmp/canopy/clusters-0/ -dm org.apache.mahout.common.distance.CosineDistanceMeasure -o tmp/kmeans/ -cd 0.001 -x 10 -ow -cl
#5 mahout keamns -> 結果分析
Java程式碼
- String seqFileDir = "/home/hduser/tmp/kmeans/clusters-2/"
- String pointsDir = "/home/hduser/tmp/kmeans/clusteredPoints/"
- def conf = new Configuration()
- FileSystem fs = new Path(seqFileDir).getFileSystem(conf)
- Map<Integer, List<WeightedVectorWritable>> clusterIdToPoints = readPoints(new Path(pointsDir), new Configuration());
- for (FileStatus seqFile: fs.globStatus(new Path(seqFileDir, "part-*"))) {
- Path path = seqFile.getPath()
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
- org.apache.hadoop.io.Writable key = reader.getKeyClass().asSubclass(org.apache.hadoop.io.Writable.class).newInstance();
- org.apache.hadoop.io.Writable value = reader.getValueClass().asSubclass(org.apache.hadoop.io.Writable.class).newInstance();
- while (reader.next(key, value)) {
- Cluster cluster = (Cluster) value;
- int id = cluster.getId()
- int np = cluster.getNumPoints()
- List<WeightedVectorWritable> points = clusterIdToPoints.get(cluster.getId());
- if (points != null && points.size() > 4) {
- for (Iterator<WeightedVectorWritable> iterator = points.iterator(); iterator.hasNext();) {
- println(((NamedVector) iterator.next().getVector()).getName())
- }
- println "======================================"
- }
- }
- }
- private static Map<Integer, List<WeightedVectorWritable>> readPoints(Path pointsPathDir, Configuration conf)
- throws IOException {
- Map<Integer, List<WeightedVectorWritable>> result = new TreeMap<Integer, List<WeightedVectorWritable>>();
- FileSystem fs = pointsPathDir.getFileSystem(conf);
- FileStatus[] children = fs.listStatus(pointsPathDir, new PathFilter() {
- @Override
- public boolean accept(Path path) {
- String name = path.getName();
- return !(name.endsWith(".crc") || name.startsWith("_"));
- }
- });
- for (FileStatus file: children) {
- Path path = file.getPath();
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
- IntWritable key = reader.getKeyClass().asSubclass(IntWritable.class).newInstance();
- WeightedVectorWritable value = reader.getValueClass().asSubclass(WeightedVectorWritable.class).newInstance();
- while (reader.next(key, value)) {
- // value is the cluster id as an int, key is the name/id of the
- // vector, but that doesn't matter because we only care about printing
- // it
- // String clusterId = value.toString();
- List<WeightedVectorWritable> pointList = result.get(key.get());
- if (pointList == null) {
- pointList = new ArrayList<WeightedVectorWritable>();
- result.put(key.get(), pointList);
- }
- pointList.add(value);
- value = reader.getValueClass().asSubclass(WeightedVectorWritable.class).newInstance();
- }
- }
- return result;
- }
效果我就不展示了