1. 程式人生 > >C# Hadoop

C# Hadoop

  1. using System;  
  2. using System.Collections.Generic;  
  3. using System.Linq;  
  4. using System.Text;  
  5. using System.Threading.Tasks;  
  6. using Microsoft.Hadoop;  
  7. using Microsoft.Hadoop.MapReduce;  
  8. using Microsoft.Hadoop.WebClient.WebHCatClient;  
  9. using System.Diagnostics;  
  10. using System.IO;  
  11. using System.IO.MemoryMappedFiles;  
  12. namespace HadoopConsol  
  13. {  
  14.     class Program  
  15.     {  
  16.         static void Main(string[] args)  
  17.         {  
  18.             Stopwatch sw = new Stopwatch();  
  19.             long hadoopTime=0;  
  20.             long normalTime=0;  
  21.             sw.Start();  
  22.             //start hadoop  
  23.             Console.WriteLine("     Hadoop Process Strating ....");  
  24.             #region Hadoop time  
  25.             #region hadoopconnet  
  26.             Console.WriteLine("     Hadoop Connect Strating ....");  
  27.             //establish job configuration  
  28.             HadoopJobConfiguration myConfig = new HadoopJobConfiguration();  
  29.             myConfig.InputPath = "/demo/simple/in";  
  30.             myConfig.OutputFolder = "/demo/simple/out";  
  31.             //connect to cluster  
  32.             Uri myUri = new Uri("http://localhost");  
  33.             string userName = "hadoop";  
  34.             string passWord = null;  
  35.             IHadoop myCluster = Hadoop.Connect(myUri, userName, passWord);  
  36.             hadoopTime += sw.ElapsedMilliseconds;  
  37.             Console.WriteLine("     Hadoop Connect End.");  
  38.             Console.WriteLine("     Hadoop Connect time:" + sw.ElapsedMilliseconds);  
  39.             #endregion  
  40.             #region hadoopmapreduce  
  41.             sw.Reset();  
  42.             sw.Start();  
  43.             Console.WriteLine("     Hadoop MapReduce Strating ....");  
  44.             //execute mapreduce job  
  45.             MapReduceResult jobResult =  
  46.                 myCluster.MapReduceJob.Execute<MySimpleMapper, MySimpleReducer>(myConfig);  
  47.             hadoopTime += sw.ElapsedMilliseconds;  
  48.             Console.WriteLine("     Hadoop MapReduce End.");  
  49.             Console.WriteLine("     Hadoop MapReduce Time:"+sw.ElapsedMilliseconds);  
  50.             #endregion  
  51.             #region Hadoop End  
  52.             sw.Reset();  
  53.             sw.Start();  
  54.             Console.WriteLine("     Hadoop Endprocess Strating ....");  
  55.             //write job result to console  
  56.             int exitCode = jobResult.Info.ExitCode;  
  57.             string exitStatus = "Failure";  
  58.             if (exitCode == 0) exitStatus = "Success";  
  59.             exitStatus = exitCode + " (" + exitStatus + ")";  
  60.             Console.WriteLine();  
  61.             Console.Write("Exit Code = " + exitStatus);  
  62.             Console.WriteLine("     Hadoop Endprocess End.");  
  63.             hadoopTime += sw.ElapsedMilliseconds;  
  64.             Console.WriteLine("     Hadoop Exit Time:" + sw.ElapsedMilliseconds);  
  65.             Console.WriteLine("     Hadoop Process All Time:" + hadoopTime);  
  66.             #endregion  
  67.             #endregion  
  68.             #region Normal time  
  69.             //start Normal  
  70.             Console.WriteLine("     Normal Process Strating ....");  
  71.             sw.Reset();  
  72.             sw.Start();  
  73.             //normal process  
  74.             #region Normal Process  
  75.             int myevenCount = 0;  
  76.             int myeventSum = 0;  
  77.             int myoddCount = 0;  
  78.             int myoddSum = 0;  
  79.             StreamReader fs = new StreamReader(@"c:\TEMP\integers.txt");  
  80.             while (fs.Peek() >= 0)  
  81.             {  
  82.                 string strTemp = fs.ReadLine();  
  83.                 if (Int32.Parse(strTemp) % 2 == 0)  
  84.                 {  
  85.                     myevenCount++;  
  86.                     myeventSum += Int32.Parse(strTemp);  
  87.                 }  
  88.                 else  
  89.                 {  
  90.                     myoddCount++;  
  91.                     myoddSum += Int32.Parse(strTemp);  
  92.                 }  
  93.             }  
  94.                //MemoryMappedFile m = MemoryMappedFile.  
  95.             Console.WriteLine("even:" + "\t" + myevenCount + "\t" + myeventSum);  
  96.             Console.WriteLine("odd:" + "\t" + myoddCount + "\t" + myoddSum);  
  97.             #endregion  
  98.             Console.WriteLine("     Normal Process End.");  
  99.             normalTime += sw.ElapsedMilliseconds;  
  100.             Console.WriteLine("     Normal Exit Time:" + sw.ElapsedMilliseconds);  
  101.             Console.WriteLine("     Normal Process All Time:" + normalTime);  
  102.             #endregion  
  103.             sw.Stop();  
  104.             Console.Read();  
  105.         }  
  106.     }  
  107.     public class MySimpleMapper : MapperBase  
  108.     {  
  109.         public override void Map(string inputLine, MapperContext context)  
  110.         {  
  111.             //interpret the incoming line as an integer value  
  112.             int value = int.Parse(inputLine);  
  113.             //determine whether value is even or odd  
  114.             string key = (value % 2 == 0) ? "even" : "odd";  
  115.             //output key assignment with value  
  116.             context.EmitKeyValue(key, value.ToString());  
  117.         }  
  118.     }  
  119.     public class MySimpleReducer : ReducerCombinerBase  
  120.     {  
  121.         public override void Reduce(  
  122.             string key, IEnumerable<string> values, ReducerCombinerContext context  
  123.             )  
  124.         {  
  125.             //initialize counters  
  126.             int myCount = 0;  
  127.             int mySum = 0;  
  128.             //count and sum incoming values  
  129.             foreach (string value in values)  
  130.             {  
  131.                 mySum += int.Parse(value);  
  132.                 myCount++;  
  133.             }  
  134.             //output results  
  135.             context.EmitKeyValue(key, myCount + "\t" + mySum);  
  136.         }  
  137.     }  
  138. }