1. 程式人生 > >cascading--wordcount

cascading--wordcount

sub 個數 另一個 dsp exp scrip 執行 width pipe管道

在eclipse下運行wordcount,使用cascading封裝

準備:centos系統,jdk,hadoop,eclipse,cascading的lib包,官網可下載,自帶cascading封裝的wordcount源碼,以及爬蟲數據data目錄,這些均可以在官網下載

我是在cascading官網把材料下載好後,在eclipse中運行,可以得到測試數據

難點:cascading的版本與官網自帶的wordcount實例可能不匹配,這需要自己自行修改,我的cascading版本不是在官網下載的

給出我的運行結果圖:

技術分享

技術分享

技術分享

代碼如下:完整版

package com.zjf.cascading.example;

/* * WordCount example * zjf-pc * Copyright (c) 2007-2012 Concurrent, Inc. All Rights Reserved. * Project and contact information: http://www.concurrentinc.com/ */ import java.util.Map; import java.util.Properties; import cascading.cascade.Cascade; import cascading.cascade.CascadeConnector; import cascading.cascade.Cascades;
import cascading.flow.Flow; import cascading.flow.FlowConnector; import cascading.operation.Identity; import cascading.operation.aggregator.Count; import cascading.operation.regex.RegexFilter; import cascading.operation.regex.RegexGenerator; import cascading.operation.regex.RegexReplace; import cascading.operation.regex.RegexSplitter;
import cascading.operation.xml.TagSoupParser; import cascading.operation.xml.XPathGenerator; import cascading.operation.xml.XPathOperation; import cascading.pipe.Each; import cascading.pipe.Every; import cascading.pipe.GroupBy; import cascading.pipe.Pipe; import cascading.pipe.SubAssembly; import cascading.scheme.SequenceFile; import cascading.scheme.TextLine; import cascading.tap.Tap; import cascading.tap.Hfs; import cascading.tap.Lfs; import cascading.tuple.Fields; public class WordCount { @SuppressWarnings("serial") private static class ImportCrawlDataAssembly extends SubAssembly { public ImportCrawlDataAssembly( String name ) { //拆分文本行到url和raw RegexSplitter regexSplitter = new RegexSplitter( new Fields( "url", "raw" ) ); Pipe importPipe = new Each( name, new Fields( "line" ), regexSplitter ); //刪除所有pdf文檔 importPipe = new Each( importPipe, new Fields( "url" ), new RegexFilter( ".*\\.pdf$", true ) ); //把":n1"替換為"\n",丟棄無用的字段 RegexReplace regexReplace = new RegexReplace( new Fields( "page" ), ":nl:", "\n" ); importPipe = new Each( importPipe, new Fields( "raw" ), regexReplace, new Fields( "url", "page" ) ); //此句強制調用 setTails( importPipe ); } } @SuppressWarnings("serial") private static class WordCountSplitAssembly extends SubAssembly { public WordCountSplitAssembly( String sourceName, String sinkUrlName, String sinkWordName ) { //創建一個新的組件,計算所有頁面中字數,和一個頁面中的字數 Pipe pipe = new Pipe(sourceName); //利用TagSoup將HTML轉成XHTML,只保留"url"和"xml"去掉其它多余的 pipe = new Each( pipe, new Fields( "page" ), new TagSoupParser( new Fields( "xml" ) ), new Fields( "url", "xml" ) ); //對"xml"字段運用XPath(XML Path Language)表達式,提取"body"元素 XPathGenerator bodyExtractor = new XPathGenerator( new Fields( "body" ), XPathOperation.NAMESPACE_XHTML, "//xhtml:body" ); pipe = new Each( pipe, new Fields( "xml" ), bodyExtractor, new Fields( "url", "body" ) ); //運用另一個XPath表達式刪除所有元素,只保留文本節點,刪除在"script"元素中的文本節點 String elementXPath = "//text()[ name(parent::node()) != ‘script‘]"; XPathGenerator elementRemover = new XPathGenerator( new Fields( "words" ), XPathOperation.NAMESPACE_XHTML, elementXPath ); pipe = new Each( pipe, new Fields( "body" ), elementRemover, new Fields( "url", "words" ) ); //用正則表達式將文檔打亂成一個個獨立的單詞,和填充每個單詞(新元組)到當前流使用"url"和"word"字段 RegexGenerator wordGenerator = new RegexGenerator( new Fields( "word" ), "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)" ); pipe = new Each( pipe, new Fields( "words" ), wordGenerator, new Fields( "url", "word" ) ); //按"url"分組 Pipe urlCountPipe = new GroupBy( sinkUrlName, pipe, new Fields( "url", "word" ) ); urlCountPipe = new Every( urlCountPipe, new Fields( "url", "word" ), new Count(), new Fields( "url", "word", "count" ) ); //按"word"分組 Pipe wordCountPipe = new GroupBy( sinkWordName, pipe, new Fields( "word" ) ); wordCountPipe = new Every( wordCountPipe, new Fields( "word" ), new Count(), new Fields( "word", "count" ) ); //此句強制調用 setTails( urlCountPipe, wordCountPipe ); } } public static void main( String[] args ) { //設置當前工作jar Properties properties = new Properties(); FlowConnector.setApplicationJarClass(properties, WordCount.class); FlowConnector flowConnector = new FlowConnector(properties); /** * 在運行設置的參數裏設置如下代碼: * 右擊Main.java,選擇run as>run confugrations>java application>Main>Agruments->Program arguments框內寫入如下代碼 * data/url+page.200.txt output local * 分析: * args[0]代表data/url+page.200.txt,它位於當前應用所在的目錄下面,且路徑必須是本地文件系統裏的路徑 * 我的所在目錄是/home/hadoop/app/workspace/HadoopApplication001/data/url+page.200.txt * 且該路徑需要自己創建,url+page.200.txt文件也必須要有,可以在官網下下載 * * args[1]代表output文件夾,第二個參數,它位於分布式文件系統hdfs中 * 我的路徑是:hdfs://s104:9000/user/hadoop/output,該路徑需要自己創建 * 在程序運行成功後,output目錄下會自動生成三個文件夾pages,urls,words * 裏面分別包含所有的page,所有的url,所有的word * * args[2]代表local,第三個參數,它位於本地文件系統中 * 我的所在目錄是/home/hadoop/app/workspace/HadoopApplication001/local * 該文件夾不需要自己創建,在程序運行成功後會自動生成在我的上述目錄中, * 且在該local文件夾下會自動生成兩個文件夾urls和words,裏面分別是url個數和word個數 */ String inputPath = args[ 0 ]; String pagesPath = args[ 1 ] + "/pages/"; String urlsPath = args[ 1 ] + "/urls/"; String wordsPath = args[ 1 ] + "/words/"; String localUrlsPath = args[ 2 ] + "/urls/"; String localWordsPath = args[ 2 ] + "/words/"; // import a text file with crawled pages from the local filesystem into a Hadoop distributed filesystem // the imported file will be a native Hadoop sequence file with the fields "page" and "url" // note this examples stores crawl pages as a tabbed file, with the first field being the "url" // and the second being the "raw" document that had all new line chars ("\n") converted to the text ":nl:". //初始化Pipe管道處理爬蟲數據裝配,返回字段url和page Pipe importPipe = new ImportCrawlDataAssembly( "import pipe" ); //創建tap實例 Tap localPagesSource = new Lfs( new TextLine(), inputPath ); Tap importedPages = new Hfs( new SequenceFile( new Fields( "url", "page" ) ), pagesPath ); //鏈接pipe裝配到tap實例 Flow importPagesFlow = flowConnector.connect( "import pages", localPagesSource, importedPages, importPipe ); //拆分之前定義的wordcount管道到新的兩個管道url和word // these pipes could be retrieved via the getTails() method and added to new pipe instances SubAssembly wordCountPipe = new WordCountSplitAssembly( "wordcount pipe", "url pipe", "word pipe" ); //創建hadoop SequenceFile文件存儲計數後的結果 Tap sinkUrl = new Hfs( new SequenceFile( new Fields( "url", "word", "count" ) ), urlsPath ); Tap sinkWord = new Hfs( new SequenceFile( new Fields( "word", "count" ) ), wordsPath ); //綁定多個pipe和tap,此處指定的是pipe名稱 Map<String, Tap> sinks = Cascades.tapsMap( new String[]{"url pipe", "word pipe"}, Tap.taps( sinkUrl, sinkWord ) ); //wordCountPipe指的是一個裝配 Flow count = flowConnector.connect( importedPages, sinks, wordCountPipe ); //創建一個裝配,導出hadoop sequenceFile 到本地文本文件 Pipe exportPipe = new Each( "export pipe", new Identity() ); Tap localSinkUrl = new Lfs( new TextLine(), localUrlsPath ); Tap localSinkWord = new Lfs( new TextLine(), localWordsPath ); // 使用上面的裝配來連接兩個sink Flow exportFromUrl = flowConnector.connect( "export url", sinkUrl, localSinkUrl, exportPipe ); Flow exportFromWord = flowConnector.connect( "export word", sinkWord, localSinkWord, exportPipe ); ////裝載flow,順序隨意,並執行 Cascade cascade = new CascadeConnector().connect( importPagesFlow, count, exportFromUrl, exportFromWord ); cascade.complete(); } }

cascading--wordcount