1. 程式人生 > >hbase 從hdfs上讀取資料到hbase中

hbase 從hdfs上讀取資料到hbase中

 1 <dependencies>
 2     <dependency>
 3         <groupId>org.apache.hbase</groupId>
 4         <artifactId>hbase-client</artifactId>
 5         <version>2.0.2</version>
 6     </dependency>
 7     <dependency>
 8         <groupId>org.apache.hbase</groupId>
 9
<artifactId>hbase-server</artifactId> 10 <version>2.0.2</version> 11 </dependency> 12 <dependency> 13 <groupId>org.apache.hbase</groupId> 14 <artifactId>hbase-mapreduce</artifactId> 15 <version>2.0.2</version> 16
</dependency> 17 </dependencies>

Mappper

 1 package cn.hbase.mapreduce.hdfs;
 2 
 3 import java.io.IOException;
 4 import java.nio.ByteBuffer;
 5 import java.util.ArrayList;
 6 import java.util.HashMap;
 7 import java.util.Iterator;
 8 import java.util.List;
 9 import java.util.Map;
10 import java.util.Map.Entry; 11 import java.util.Set; 12 13 import org.apache.hadoop.hbase.client.Put; 14 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 15 import org.apache.hadoop.hbase.util.Bytes; 16 import org.apache.hadoop.io.LongWritable; 17 import org.apache.hadoop.io.Text; 18 import org.apache.hadoop.mapreduce.Mapper; 19 20 /** 21 * 22 * @author Tele 輸入key hdfs上的文字的行號 輸入value 文字 輸出key 行鍵 輸出value 將插入hbase的一行資料,需要行鍵 23 * 24 */ 25 26 public class ReadFruitFromHdfsMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { 27 28 @Override 29 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 30 // 讀取 31 String line = value.toString(); 32 33 // 切割 34 /** 35 * 1001 apple red 1002 pear yellow 1003 pineapple yellow 36 */ 37 String[] fields = line.split("\t"); 38 39 // 每個列族對應多個列 40 Map<String, Object> map = new HashMap<String, Object>(); 41 42 // 封裝列族下需要的列 43 List<String> infoCNList = new ArrayList<String>(); 44 infoCNList.add("name");// 值對應field[1] 45 infoCNList.add("color");// 值對應field[2] 46 map.put("info", infoCNList); 47 48 String row = fields[0]; 49 50 // 封裝 51 Put put = new Put(Bytes.toBytes(row)); 52 53 // 遍歷map,封裝每個列族下的列 54 Set<Entry<String, Object>> entrySet = map.entrySet(); 55 Iterator<Entry<String, Object>> iterator = entrySet.iterator(); 56 while (iterator.hasNext()) { 57 Entry<String, Object> entry = iterator.next(); 58 String cf = entry.getKey(); 59 List<String> cnList = (List<String>) entry.getValue(); 60 61 // 遍歷list 62 for (int i = 0; i < cnList.size(); i++) { 63 put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cnList.get(i)), Bytes.toBytes(fields[i + 1])); 64 } 65 } 66 67 // 行鍵 68 ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(fields[0])); 69 70 // 寫出 71 context.write(immutableBytesWritable, put); 72 73 } 74 75 }

Reducer

 1 package cn.hbase.mapreduce.hdfs;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.hbase.client.Mutation;
 6 import org.apache.hadoop.hbase.client.Put;
 7 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 8 import org.apache.hadoop.hbase.mapreduce.TableReducer;
 9 import org.apache.hadoop.io.NullWritable;
10 import org.apache.hadoop.mapreduce.Reducer;
11 
12 /** 
13  *
14  *@author Tele
15  *
16  *對hbase上的表操作,繼承tablereducer即可
17  *
18  */
19 
20 public class WriteFruitReducer extends TableReducer<ImmutableBytesWritable,Put,NullWritable> {
21     
22     @Override
23     protected void reduce(ImmutableBytesWritable key, Iterable<Put> value,Context context) throws IOException, InterruptedException {
24         for (Put put : value) {
25             context.write(NullWritable.get(), put);
26         }
27     }
28     
29 }

Runner

 1 package cn.hbase.mapreduce.hdfs;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.conf.Configured;
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.hbase.HBaseConfiguration;
 7 import org.apache.hadoop.hbase.client.Put;
 8 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 9 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12 import org.apache.hadoop.util.Tool;
13 import org.apache.hadoop.util.ToolRunner;
14 
15 /**
16  *
17  * @author Tele
18  *
19  */
20 
21 public class FruitRunner extends Configured implements Tool {
22 
23     public int run(String[] args) throws Exception {
24         // 例項化job
25         Job job = Job.getInstance(this.getConf());
26 
27         // 設定jar包路徑
28         job.setJarByClass(FruitRunner.class);
29 
30         // 組裝mapper
31         job.setMapperClass(ReadFruitFromHdfsMapper.class);
32         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
33         job.setMapOutputValueClass(Put.class);
34 
35         // 設定資料來源
36         FileInputFormat.addInputPath(job, new Path("/input_fruit"));
37 
38         // 組裝reducer
39         TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitReducer.class, job);
40 
41         // 設定reduce個數
42         job.setNumReduceTasks(1);
43 
44         // 提交
45 
46         return job.waitForCompletion(true) ? 0 : 1;
47     }
48 
49     public static void main(String[] args) throws Exception {
50         Configuration conf = HBaseConfiguration.create();
51         ToolRunner.run(new FruitRunner(), args);
52 
53     }
54 
55 }

 ps:需要預先建立表