MapReduce功能實現四---小綜合(從hbase中讀取資料統計並在hdfs中降序輸出Top 3)
MapReduce功能實現系列:
方法一:
在Hbase中建立相應的表1:
create 'hello','cf' put 'hello','1','cf:hui','hello world' put 'hello','2','cf:hui','hello hadoop' put 'hello','3','cf:hui','hello hive' put 'hello','4','cf:hui','hello hadoop' put 'hello','5','cf:hui','hello world' put 'hello','6','cf:hui','hello world' put 'hello','7','cf:hui','hbase hive'
java程式碼:
import java.io.IOException; import java.util.Comparator; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class HbaseTopJiang1 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { String tablename = "hello"; Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "h71"); Job job = new Job(conf, "WordCountHbaseReader"); job.setJarByClass(HbaseTopJiang1.class); Scan scan = new Scan(); TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, IntWritable.class, job); job.setReducerClass(WordCountHbaseReaderReduce.class); FileOutputFormat.setOutputPath(job, new Path(args[0])); MultipleOutputs.addNamedOutput(job, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class doMapper extends TableMapper<Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { /*不進行分隔,將value整行全部獲取 String rowValue = Bytes.toString(value.list().get(0).getValue()); context.write(new Text(rowValue), one); */ String[] rowValue = Bytes.toString(value.list().get(0).getValue()).split(" "); for (String str: rowValue){ word.set(str); context.write(word,one); } } } public static final int K = 3; public static class WordCountHbaseReaderReduce extends Reducer<Text, IntWritable, Text, IntWritable> { //定義treeMap來保持統計結果,由於treeMap是按key升序排列的,這裡要人為指定Comparator以實現倒排 private TreeMap<Integer, String> treeMap = new TreeMap<Integer, String>(new Comparator<Integer>() { @Override public int compare(Integer x, Integer y) { return y.compareTo(x); } }); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //reduce後的結果放入treeMap,而不是向context中記入結果 int sum = 0; for (IntWritable val : values) { sum += val.get(); } if (treeMap.containsKey(sum)){ String value = treeMap.get(sum) + "," + key.toString(); treeMap.put(sum,value); }else { treeMap.put(sum, key.toString()); } if(treeMap.size() > K) { treeMap.remove(treeMap.lastKey()); } } protected void cleanup(Context context) throws IOException, InterruptedException { //將treeMap中的結果,按value-key順序寫入contex中 for (Integer key : treeMap.keySet()) { context.write(new Text(treeMap.get(key)), new IntWritable(key)); } } } }
在Linux中執行該程式碼:
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/javac HbaseTopJiang1.java
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseTopJiang1*class
[[email protected] q1]$ hadoop jar xx.jar HbaseTopJiang1 /out
[[email protected] q1]$ hadoop fs -cat /out/part-r-00000
hello 6
world 3
hadoop,hive 2
方法二:
truncate 'hello'
put 'hello','1','cf:hui','hello world world'
put 'hello','2','cf:hui','hello hadoop hadoop'
put 'hello','3','cf:hui','hello hive hive'
put 'hello','4','cf:hui','hello hadoop hadoop'
put 'hello','5','cf:hui','hello world world'
put 'hello','6','cf:hui','hello world world'
put 'hello','7','cf:hui','hbase hive hive'
注意:相同單詞之間的分隔符是"/t"(Tab鍵),結果hbase中插入資料的時候根本就不能插入製表符,所以該方法破產,可以參考一下思想java程式碼:
import java.io.IOException;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class HbaseTopJiang2{
public static class doMapper extends TableMapper<Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
/*不進行分隔,將value整行全部獲取
String rowValue = Bytes.toString(value.list().get(0).getValue());
context.write(new Text(rowValue), one);
*/
String[] rowValue = Bytes.toString(value.list().get(0).getValue()).split(" ");
for (String str: rowValue){
word.set(str);
context.write(word,one);
}
}
}
public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
@Override
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int total=0;
for (IntWritable val : values){
total++;
}
context.write(key, new IntWritable(total));
}
}
public static final int K = 3;
/**
* 把上一個mapreduce的結果的key和value顛倒,調到後就可以按照key排序了。
*/
public static class KMap extends Mapper<LongWritable,Text,IntWritable,Text> {
TreeMap<Integer, String> map = new TreeMap<Integer, String>();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String result[] = line.split("\t");
StringBuffer hui = null;
if(result.length > 2){ //我怕在往hbase表輸入資料時帶\t分隔符的,後來發現hbase中插入資料的時候根本就不能插入製表符
for(int i=0;i<result.length-2;i++){
hui=new StringBuffer().append(result[i]);
}
}else{
hui = new StringBuffer().append(result[0]);
}
if(line.trim().length() > 0 && line.indexOf("\t") != -1) {
String[] arr = line.split("\t", 2);
String name = arr[0];
Integer num = Integer.parseInt(arr[1]);
if (map.containsKey(num)){
String value1 = map.get(num) + "," + hui;
map.put(num,value1);
}
else {
map.put(num, hui.toString());
}
if(map.size() > K) {
map.remove(map.firstKey());
}
}
}
@Override
protected void cleanup(Mapper<LongWritable, Text, IntWritable, Text>.Context context)
throws IOException, InterruptedException {
for(Integer num : map.keySet()) {
context.write(new IntWritable(num), new Text(map.get(num)));
}
}
}
/**
* 按照key的大小來劃分區間,當然,key是int值
*/
public static class KeySectionPartitioner<K, V> extends Partitioner<K, V> {
@Override
public int getPartition(K key, V value, int numReduceTasks) {
/**
* int值的hashcode還是自己本身的數值
*/
//這裡我認為大於maxValue的就應該在第一個分割槽
int maxValue = 50;
int keySection = 0;
// 只有傳過來的key值大於maxValue 並且numReduceTasks比如大於1個才需要分割槽,否則直接返回0
if (numReduceTasks > 1 && key.hashCode() < maxValue) {
int sectionValue = maxValue / (numReduceTasks - 1);
int count = 0;
while ((key.hashCode() - sectionValue * count) > sectionValue) {
count++;
}
keySection = numReduceTasks - 1 - count;
}
return keySection;
}
}
/**
* int的key按照降序排列
*/
public static class IntKeyDescComparator extends WritableComparator {
protected IntKeyDescComparator() {
super(IntWritable.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
return -super.compare(a, b);
}
}
/**
* 把key和value顛倒過來輸出
*/
public static class SortIntValueReduce extends Reducer<IntWritable, Text, Text, IntWritable> {
private Text result = new Text();
@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text val : values) {
result.set(val.toString());
context.write(result, key);
}
}
}
public static void main(String[] args) throws Exception {
String tablename = "hello";
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "h71");
Job job1 = new Job(conf, "WordCountHbaseReader");
job1.setJarByClass(HbaseTopJiang2.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, IntWritable.class, job1);
job1.setReducerClass(WordCountReducer.class);
FileOutputFormat.setOutputPath(job1, new Path(args[0]));
MultipleOutputs.addNamedOutput(job1, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);
Job job2 = Job.getInstance(conf, "Topjiang");
job2.setJarByClass(HbaseTopJiang2.class);
job2.setMapperClass(KMap.class);
job2.setSortComparatorClass(IntKeyDescComparator.class);
job2.setPartitionerClass(KeySectionPartitioner.class);
job2.setReducerClass(SortIntValueReduce.class);
job2.setOutputKeyClass(IntWritable.class);
job2.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job2, new Path(args[0]));
FileOutputFormat.setOutputPath(job2, new Path(args[1]));
//提交job1及job2,並等待完成
if (job1.waitForCompletion(true)) {
System.exit(job2.waitForCompletion(true) ? 0 : 1);
}
}
}
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/javac HbaseTopJiang2.java[[email protected] q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseTopJiang2*class
[[email protected] q1]$ hadoop jar xx.jar HbaseTopJiang2 /out /output
[[email protected] q1]$ hadoop fs -ls /out
-rw-r--r-- 2 hadoop supergroup 0 2017-03-18 19:02 /out/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 32 2017-03-18 19:02 /out/part-r-00000
[[email protected] q1]$ hadoop fs -ls /output
-rw-r--r-- 2 hadoop supergroup 0 2017-03-18 19:02 /output/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 25 2017-03-18 19:02 /output/part-r-00000
理想結果:
[[email protected] q1]$ hadoop fs -cat /out/part-r-00000
hbase1
hadoophadoop
2
hello6
hivehive
2
worldworld
3
[[email protected] q1]$ hadoop fs -cat /output/part-r-00000
hello6
worldworld
3
hadoophadoop,hive hive 2
(分隔符都為製表符)
我發現製表符(Tab鍵)從UltraEdit複製到SecureCRT正常,而從SecureCRT複製到UltraEdit則製表符會變成空格,也是醉了。。。
相關推薦
MapReduce功能實現四---小綜合(從hbase中讀取資料統計並在hdfs中降序輸出Top 3)
MapReduce功能實現系列: 方法一: 在Hbase中建立相應的表1: create 'hello','cf' put 'hello','1','cf:hui','hello world' put 'hello','2','cf:hui','h
JAVA從Excel中讀取資料儲存到資料庫中
1.jar包 2.資料庫資訊 3.JDBC連線資料庫工具類 package Test; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedSta
從Matlab .fig檔案中讀取資料,並重新繪圖
Matlab提供了強大的函式集合,可以從.fig檔案中讀取圖中的資料,並重新繪製圖形。如果原始資料丟失,我們可以從.fig檔案中恢復原始資料,並基於原始資料做進一步的處理。 以下是一個從兩個不同檔案
MapReduce功能實現一---Hbase和Hdfs之間資料相互轉換
MapReduce功能實現系列: 一、從Hbase表1中讀取資料再把統計結果存到表2 在Hbase中建立相應的表1: create 'hello','cf' put 'hello','1','cf:hui','hello world' put 'he
從hbase表1中讀取資料,最終結果寫入到hbase表2 ,如何通過MapReduce實現 ?
需要一: 將hbase中‘student’表中的info:name和info:age兩列資料取出並寫入到hbase中‘user’表中的basic:XM和basic:NL class ReadStudentMapper extends Table
Android 二維碼開發功能實現(四)------基於Zxing實現編碼功能(生成二維碼,一維碼等)
Android 二維碼開發功能實現(四)------基於Zxing實現編碼功能(生成二維碼,一維碼等) 前言 關於Google的開源庫Zxing,前面也寫了幾遍文章進行介紹.我們先簡單的回顧一下! Android 二維碼的掃碼功能實現(一) 這篇文章主要介紹了,Zxi
MapReduce功能實現
MapReduce功能實現系列: MapReduce功能實現一---Hbase和Hdfs之間資料相互轉換 MapReduce功能實現二---排序 MapReduce功能實現三---Top N MapReduce功能實現四---小綜合(從hbas
向HBase中匯入資料3:使用MapReduce從HDFS或本地檔案中讀取資料並寫入HBase(增加使用Reduce批量插入)
前面我們介紹了:為了提高插入效率,我們在前面只使用map的基礎上增加使用reduce,思想是使用map-reduce操作,將rowkey相同的項規約到同一個reduce中,再在reduce中構建put物件實現批量插入測試資料如下:注意到有兩條記錄是相似的。package cn
mapreduce從hbase大量讀資料超時異…
16/05/06 19:56:13 INFO mapreduce.Job: Task Id : attempt_1461653563167_0008_m_000001_2, Status : FAILED Error: org.apache.hadoop.hbase.client.RetriesExhau
MapReduce功能實現六---最大值(Max)、求和(Sum)、平均值(Avg)
MapReduce功能實現系列: 一、最大值(Max) 情況1: [[email protected] q1]$ vi ql.txt aa 111 22 555 [[email protected] q1]$ hadoop fs
HBase建表高階屬性,hbase應用案例看行鍵設計,HBase和mapreduce結合,從Hbase中讀取資料、分析,寫入hdfs,從hdfs中讀取資料寫入Hbase,協處理器和二級索引
1. Hbase高階應用 1.1建表高階屬性 下面幾個shell 命令在hbase操作中可以起到很到的作用,且主要體現在建表的過程中,看下面幾個create 屬性 1、 BLOOMFILTER 預設是NONE 是否使用布隆過慮及使用何種方式 布隆
python3 簡單實現從csv文件中讀取內容,並對內容進行分類統計
tmp spa writer ict 打開文件 while 類型 spl blog 新手python剛剛上路,在實際工作中遇到如題所示的問題,嘗試使用python3簡單實現如下,歡迎高手前來優化import csv #打開文件,用with打開可以不用去特意關閉file了
用shell實現一個小指令碼,用來同來統計自己某個檔案下的程式碼,總的程式碼行數,總的註釋量,總的空行量?支援遍歷查詢,支援軟連結查詢
[[email protected] yunwei]# cat sum_code_row_version1.4.sh #!/bin/bash # File Name: sum_code_row.sh # Author: Liwqiang # mail: [email
Android Studio平臺下使用hellochart實現從txt檔案讀取資料繪折線圖
Android Studio平臺下使用hellochart實現從文字讀取資料繪折線圖 本人是一個剛剛接觸Android不超過兩個月的小白,最近在做的論文是關於這一塊的相關內容。所有的東西都是自學的,聽導師的建議也是第一次留個這樣的資料,可能有很多地方理解不到位,
python實現從檔案中讀取資料並繪製成 x y 軸圖形
import matplotlib.pyplot as plt import numpy as np def readfile(filename): dataList = [] dataNum = 0 with open(filename,'r')
Kafka系列(四)Kafka消費者:從Kafka中讀取資料
本系列文章為對《Kafka:The Definitive Guide》的學習整理,希望能夠幫助到大家應用從Kafka中讀取資料需要使用KafkaConsumer訂閱主題,然後接收這些主題的訊息。在我們深入這些API之前,先來看下幾個比較重要的概念。Kafka消費者相關的概念消
Spark支援四種方式從資料庫中讀取資料
目前Spark支援四種方式從資料庫中讀取資料,這裡以Mysql為例進行介紹。 一、不指定查詢條件 這個方式連結MySql的函式原型是: def jdbc(url: String, table: String, properties: Properties):
java實現k-means演算法(用的鳶尾花iris的資料集,從mysq資料庫中讀取資料)
k-means演算法又稱k-均值演算法,是機器學習聚類演算法中的一種,是一種基於形心的劃分方法,其中每個簇的中心都用簇中所有物件的均值來表示。其思想如下: 輸入: k:簇的數目;D:包含n個物件的資料集。輸出:k個簇的集合。 方法: 從D中隨機選擇幾個物件作為起始質心
live555從RTSP伺服器讀取資料到使用接收到的資料流程分析
本文在linux環境下編譯live555工程,並用cgdb除錯工具對live555工程中的testProgs目錄下的openRTSP的執行過程進行了跟蹤分析,直到將從socket端讀取視訊資料並儲存為對應的視訊和音訊資料為止。 進入testProgs目錄,執行./openRTSP rtsp://
python 從檔案中讀取資料,同時去除掉空格和換行
從檔案中讀取資料,同時去除掉空格和換行,程式碼如下 import numpy as np def sort(path): w = open(path,'r') l = w.readlines() col=[] for k in l: k = k.strip('\n')