基於檔案的資料結構之SequenceFile實現
阿新 • • 發佈:2018-12-05
Hadoop的sequenceFile為二進位制鍵/值對提供了一個持久資料結構。它可以作為小檔案的容器。HDFS和MapReduce是針對大檔案優化的,所以通過SequenceFile型別將小檔案包裝起來,可以獲得更高效率的儲存和處理。
SequenceFile的實現程式碼:
package com.jr.sequencefile; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.Text; import org.junit.Test; public class TestSequenceFile { /** * 寫入 * @throws IOException */ @Test public void write() throws IOException { Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(conf); Path name=new Path("/user/centos/hadoop/meseq2.seq"); IntWritable iw=new IntWritable(); Text txt=new Text(); Writer w=SequenceFile.createWriter(fs, conf, name, IntWritable.class, Text.class); for(int i=0;i<1000;i++) { w.append(new IntWritable(i), new Text("tom"+i)); } w.close(); } /** * 讀資料 */ @Test public void read() throws IOException { Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(conf); Path name=new Path("/user/centos/hadoop/meseq2.seq"); IntWritable key=new IntWritable(); Text txt=new Text(); SequenceFile.Reader reader=new SequenceFile.Reader(fs, name,conf); while(reader.next(key)) { reader.getCurrentValue(txt); System.out.println(key.get()+":"+txt.toString()); } } /** * 寫入無序的資料 * @throws IOException */ @Test public void writeNoOrder() throws IOException { Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(conf); Path name=new Path("/user/centos/hadoop/meseqNoOrder.seq"); IntWritable iw=new IntWritable(); Text txt=new Text(); Writer w=SequenceFile.createWriter(fs, conf, name, IntWritable.class, Text.class); w.append(new IntWritable(2), new Text("tom2")); w.append(new IntWritable(1), new Text("tom1")); w.append(new IntWritable(5), new Text("tom5")); w.append(new IntWritable(3), new Text("tom3")); w.close(); } /** * 對sequence進行排序 * @throws IOException */ @Test public void sortSeqFile() throws IOException { Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(conf); Path src=new Path("/user/centos/hadoop/meseqNoOrder.seq"); Path dest=new Path("/user/centos/hadoop/meseqOrder.seq"); IntWritable iw=new IntWritable(); Text txt=new Text(); //建立排序物件 SequenceFile.Sorter sorter=new SequenceFile.Sorter(fs, IntWritable.class, Text.class, conf); sorter.sort(src,dest); } /** * 合併檔案 * @throws IOException */ @Test public void mergeFile() throws IOException { Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(conf); Path src=new Path("/user/centos/hadoop/meseqNoOrder.seq"); Path dest=new Path("/user/centos/hadoop/meseqOrder.seq"); Path merge=new Path("/user/centos/hadoop/mergeFile.seq"); IntWritable iw=new IntWritable(); Text txt=new Text(); //建立排序物件 SequenceFile.Sorter sorter=new SequenceFile.Sorter(fs, IntWritable.class, Text.class, conf); sorter.merge(new Path[] {src,dest}, merge); } }
檢視各個方法執行結果:
1.public void write():
2.public void read()讀資料:
3.public void writeNoOrder()寫入無序的資料
4.public void sortSeqFile() 對 sequence進行排序
5.public void mergeFile()合併(合併檔案沒有排序處理)