用IDEA編寫一個wordcount
阿新 • • 發佈:2018-12-16
建立一個maven專案:
在pom.xml中插入以下程式碼,匯入對應包:這裡注意<mainClass>cn.itcast.hadoop.wordcountdrive</mainClass>,不新增主類路徑hadoop jar ***.jar命令無法找到執行主類
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.7.1</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>C:/Program Files/Java/jdk1.8.0_144/lib/tools.jar</systemPath> </dependency> </dependencies> <build> <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --> <plugins> <plugin> <artifactId>maven-clean-plugin</artifactId> <version>3.0.0</version> </plugin> <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --> <plugin> <artifactId>maven-resources-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> </plugin> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.20.1</version> <!--<configuration>--> <!--<source>1.8</source>--> <!--<target>1.8</target>--> <!--<encoding>UTF-8</encoding>--> <!--</configuration>--> </plugin> <plugin> <artifactId>maven-jar-plugin</artifactId> <version>3.0.2</version> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib</classpathPrefix> <mainClass>cn.itcast.hadoop.wordcountdrive</mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <artifactId>maven-install-plugin</artifactId> <version>2.5.2</version> </plugin> <plugin> <artifactId>maven-deploy-plugin</artifactId> <version>2.8.2</version> </plugin> </plugins> </pluginManagement> </build>
在Java包中建立itcast.hadoop包,建立wordcountmapper,wordcountreducer和wordcountdrive類:
wordcountdrive:這個類就是mr程式執行時的主類 告訴本類中組裝了一些程式執行時所需的資訊 比如哪個reduce或mapper類 輸入資料在哪 輸出資料在哪
public class wordcountdrive { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 通過job這個類來封裝本次mr的相關資訊 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //指定本次mrjobjar包的執行主類 job.setJarByClass(wordcountdrive.class); //指定reducetask的個數 job.setNumReduceTasks(3); //指定重寫的分割槽類 job.setPartitionerClass(PPartitioner.class); //指定本次mr所用的mapper reduce類分別是什麼 job.setMapperClass(wordcountmapper.class); job.setReducerClass(wordcountreduce.class); //指定本次mrmapper階段的輸出kv型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定本次mr最終輸出的kv型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定本次mr輸入的資料路徑和最終輸出結果存放於在什麼位置 FileInputFormat.setInputPaths(job, "/wordcount/input"); FileOutputFormat.setOutputPath(job, new Path("/wordcount/output")); //job.submit(); //提交程式 並且監控列印執行情況 boolean b=job.waitForCompletion(true); System.exit(b?0:1); } }
wordcountmapper:這裡就是map階段具體的業務邏輯實現方法 該方法的呼叫取決於讀取資料的元件有沒有給mr傳入資料 如果有的話 每傳入一個kv對 該方法就會被呼叫一次
/* * 這裡就是mapreduce程式 map階段逐漸實現的類 * <KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN表示mapper資料輸入的時候,在預設的讀取資料元件下叫inputformat, * 他的行為是一行一行的讀取待處理的資料, * 讀取一行返回一行給我們的mr程式,在這種情況下, * KEYIN就表示我們的每一行的起始偏移量 * 因此資料型別是long型別 * VALUEIN表示mapper資料輸入的時候value的資料型別, * 在預設的資料讀取情況下,valuein就表示讀取的這一行的內容 * 因此資料型別是string * KETOUT表示mapper資料輸出的時候key的資料型別 在本案列種 key是單詞, * 所以是string * VALUEOUT表示mapper資料輸出的時候value的資料型別 。。。是integer * 這裡所說的資料型別是jdk自帶的型別 在序列化時 效率低下 * 因此hadoop自己封裝了一套資料型別 */ public class wordcountmapper extends Mapper<LongWritable, Text, Text, IntWritable> { /** * 這裡就是map階段具體的業務邏輯實現方法 該方法的呼叫取決於讀取資料的元件有沒有給mr傳入資料 如果有的話 * 沒傳入一個kv對 該方法就會被呼叫一次 */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { //拿到傳入進來的型別 轉化為string String line=value.toString(); //將這一行內容按照分隔符,進行一行內容的切割 //切割成一個個單詞陣列 String[] words=line.split(" "); //遍歷陣列 沒出現一個單詞 就標記一個數字1 //<單詞,1> for(String word:words){ //使用mr的程式的上下文context,吧map階段處理的資料傳送出去 //作為reduce階段輸入資料 context.write(new Text(word),new IntWritable(1)); } } public static void main(String[] args) { // TODO Auto-generated method stub } }
wordcountreduce:reduce接受後 按照key的字典序進行排序 按照key是否相同作為一組去呼叫reduce方法 本方法的k就是這一組相同kv對的共同key 把這一組所有的v作為迭代器傳入我們的reduce方法
public class wordcountreduce extends
Reducer<Text, IntWritable, Text, IntWritable> {
/**
* reduce接受後 按照key的字典序進行排序 按照key是否相同作為一組去呼叫reduce方法 本方法的k就是這一組相同kv對的共同key
* 吧這一組所有的v作為迭代器傳入我們的reduce方法
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
// super.reduce(arg0, arg1, arg2);
int count = 0;
// 遍歷一組迭代器,吧每一個數量一累加起來 就構成了單詞的總次數
for (IntWritable value : values) {
count += value.get();
}
// 吧最終的結果輸出
context.write(key, new IntWritable(count));
}
public static void main(String[] args) {
// TODO Auto-generated method stub
}
}
PPartitioner :按照首字母ASCII值進行分割槽,因為每個reduce會進行自排序,所以我們只用分好區就好了,排序交給框架。
package cn.itcast.hadoop;
import org.apache.hadoop.mapreduce.Partitioner;
public class PPartitioner extends Partitioner {
@Override
public int getPartition(Object o, Object o2, int i) {
String value = o.toString();
//取首字母
char word = value.charAt(0);
int asc = word;
if (97 <= asc && asc <= 102) {
return 0;
} else if (102 < asc && asc <= 109) {
return 1;
}
return 2;
}
}
將專案打成jar包,打包成功後會在target下看到打好的jar包:
將這個jar包放在叢集裡執行
成功。