圓周率π的近似計算(三)-MapReduce分散式計算入門
MapReduce 分散式計算入門
在學 hadoop 的我們最先接觸的分散式框架就是MapReduce框架,本意就是通過使用MapReduce 框架進行實現圓周率 π 的分散式計算的小demo;
MapReduce 的處理流程
Mapper 階段執行流程
第一階段 將輸入目錄下的檔案按照一定的標準進行邏輯切片,形成片規劃.預設的切片規則是按照 檔案塊 切片.每片都有一個 MapTask 進行處理.(getSplits)
第二階段 是對切片中的資料按照一定的規則解析成 key,value 元組.預設規則是把每一行文字內容解析成鍵值對. key 是每一行的起始位置,value 是本行的文字內容.(TextInputFormat)
第三階段 是呼叫Mapper 類中的Map 方法. 上階段每解析出來一個 k,v 呼叫一次 map 方法.輸出同樣是鍵值對,但是可以有多個輸出.
第四階段 是按照一定的規則對第三階段輸出的鍵值對進行分割槽。 預設是隻有一個區。 分割槽的數量就是 Reducer 任務執行的數量。預設只有一個Reducer 任務。
第五階段是對每個分割槽中的鍵值對進行排序。如果有第六階段,那麼進入第六階段;如果沒有,直接輸出到檔案中。
第六階段是對資料進行區域性聚合處理,也就是 combiner 處理。 鍵相等的鍵值對會呼叫一次 reduce 方法。經過這一階段,資料量會減少。 本階段預設是沒有的。
Reduce 階段執行流程
第一階段是 Reducer 任務會主動從 Mapper 任務複製其輸出的鍵值對。Mapper 任務可能會有很多,因此 Reducer 會複製多個 Mapper 的輸出。
第二階段是把複製到 Reducer 本地資料,全部進行合併,即把分散的資料合併成一個大的資料。再對合並後的資料排序。
第三階段是對排序後的鍵值對呼叫 reduce 方法。 鍵相等的鍵值對呼叫一次reduce 方法,每次呼叫會產生零個或者多個鍵值對。最後把這些輸出的鍵值對寫入到 HDFS 檔案中。
進行圓周率 π 的分散式計算
從分 MapReduce 的執行流程看出,分散式框架已經幫我們實現了任務的分發,因此我們的關注點就基本可以不用考慮關於分散式方面的任務,我們主要需要關注的就是 map 方法的構建
,和 reduce 方法的構建.
數學模型
進行分散式計算的任務有個先決條件,那就是可以進行平行計算,就是各 map 之間相互獨立,無依賴關係.
因此,我們在建立數學模型時最好是一個重複計算無相互關聯的模型,而 蒙特卡洛 的模型恰好符合這種特點,因此我們依次構建模型.
在上篇文章中,我們採用的通過面積比的方式來近似求解 圓周率 π ,即正方形面積為 1 ,扇形面積為 π/4,則正方形和扇形的面積比為 π/4,求得這個面積比,我們就能得到最後的 π;
蒙特卡羅方法,就是將面積比轉化為概率問題求解,就是在正方形中取一隨機點進行重複試驗,統計出這個點落在扇形中的概率.用這個概率去描述扇形和正方形的面積比,求得概率即可近似求出圓周率 π 的大小;
隨機試驗的優點是,每次試驗相互獨立互不影響,缺點隨機性大,資料不穩定,一般只用做近似求解.
綜上,我們可以將 求解圓周率的任務 轉化為隨機試驗的統計工作.
實現方法
首先我們給出Mapper階段方法
public class SolvingPiMapper extends Mapper<LongWritable, Text, Text, Text> { /** * key 輸入 讀取檔案的起始位置 * value 輸入 檔案中一行的內容 * context 輸出 <k,v>形式 */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { //將從檔案中讀取的隨機試驗次數解析出來 String num = value.toString(); Integer totle = new Integer(num); //宣告隨機試驗中落點在扇形中數量 int sum = 0; //進行隨機試驗並統計 for(int i=0;i<totle;i++){ double x = Math.random(); double y = Math.random(); if((x*x+y*y)<1){ sum++; } } //將最後結果輸出 context.write(new Text("PI"), new Text(totle+"--"+sum)); } }
給出 reduce 方法
public class SolvingPiReducer extends Reducer<Text, Text, Text, DoubleWritable> { /** * name 輸入的 "PI" * message 輸入的"totle--num" * context 輸出的<k,v> * 所有鍵位"PI"的輸入都用這個方法進行處理 */ @Override protected void reduce(Text name, Iterable<Text> message, Reducer<Text, Text, Text, DoubleWritable>.Context context) throws IOException, InterruptedException { //宣告試驗進行的總數 long sumTotle =0; //宣告落點在扇形區域中的總數 long sumOrder =0; //解析輸入的message資訊,從這提取上述兩個值 for (Text text : message) { String[] nums = text.toString().split("--"); sumTotle+= new Integer(nums[0]); sumOrder+= new Integer(nums[1]); } //System.out.println("π的近似值為"+sumOrder*4.0/sumTotle); //輸出最後結果 context.write(name,new DoubleWritable(sumOrder*4.0/sumTotle)); } }
解析 map 方法返回的資訊,進行彙總並輸出最後結果.
定義一個主類,用來描述job並提交job
public class SolvingPiRunner { //把業務邏輯相關的資訊(哪個是 mapper,哪個是 reducer,要處理的資料在哪裡,輸出的結果放在哪裡……)描述成一個 job 物件 //把這個描述好的 job 提交給叢集去執行 public static void main(String[] args) throws Exception { //使用者自定義輸入 System.out.println("請輸入你想分的片數:"); Scanner sc = new Scanner(System.in); int pice=new Integer(sc.nextLine()); System.out.println("請輸入你每片執行多少次:"); String line=sc.nextLine(); //按照分片生成檔案(在實際環境中需要在hdfc中建立檔案) for(int i=0;i<pice;i++){ BufferedWriter bw = new BufferedWriter(new FileWriter(new File("D:\\hadoop\\input\\"+(i+1)+".txt"))); bw.write(line); bw.close(); } //把業務邏輯相關的資訊(哪個是 mapper,哪個是 reducer,要處理的資料在哪裡,輸出的結果放在哪裡……)描述成一個 job 物件 //把這個描述好的 job 提交給叢集去執行 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //知道這個job所在jar包 job.setJarByClass(SolvingPiRunner.class); job.setMapperClass(SolvingPiMapper.class); job.setReducerClass(SolvingPiReducer.class); //設定我們的業務邏輯Mapper類的輸出key 和 value 的資料 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //設定我們的業務邏輯Reducer 類的輸出Key和value 的資料型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); //指定要處理的資料所在的位置 FileInputFormat.setInputPaths(job, "D:\\hadoop\\input\\*.txt"); //指定處理完成後,結果所儲存的位置 FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output\\result")); //向yarn叢集提交這個job boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
在windows環境下模擬叢集環境執行測試;
遇到的問題
啟動問題報錯
Exception in thread "main" java.io.IOException: (null) entry in command string: null chmod 0700 D:\tmp\hadoop-lxc\mapred\staging\lxc1332581434\.staging at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773) at org.apache.hadoop.util.Shell.execCommand(Shell.java:869) at org.apache.hadoop.util.Shell.execCommand(Shell.java:852) at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733) at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:491) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:532) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:509) at org.apache.hadoop.fs.FilterFileSystem.mkdirs(FilterFileSystem.java:312) at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:133) at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:144) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746) at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287) at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308) at test.demo.SolvingPiRunner.main(SolvingPiRunner.java:54)
解決方法
報錯二
Exception in thread "main" java.lang.RuntimeException: Error while running command to get file permissions : java.io.IOException: (null) entry in command string: null ls -F D:\hadoop\input\1.txt
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:869)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:852)
at org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:1097)
at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:659)
at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getPermission(RawLocalFileSystem.java:634)
at org.apache.hadoop.fs.LocatedFileStatus.<init>(LocatedFileStatus.java:49)
at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1733)
at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1713)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:305)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:387)
at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308)
at test.demo.SolvingPiRunner.main(SolvingPiRunner.java:54)
原因及解決辦法
- 在windows環境下讀取檔案不能直接寫檔案所在路徑,需要寫到檔案,如果需要讀多個檔案可以用萬用字元 * 代之多個;
測試結果
我啟動程式進行運算
10X10000000 次
結果
PI 3.141599752