1. 程式人生 > >Hadoop學習之HDFS的相關操作

Hadoop學習之HDFS的相關操作

以下是使用Hadoop2.4.1的JAVA API進行HDFS的相關操作

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Formatter;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
import org.junit.Test;

import com.yq.common.HdfsUtils;
import com.yq.common.RegexExcludePathFilter;

public class Dhfs {

    @Test
    public void Test() throws IllegalArgumentException, IOException{

        //建立目錄
        //mkdir(new Path("/test/test/test"));

        //建立檔案,不寫入內容
        //createFile(new Path("/test/test.null"));

        //建立檔案並寫入內容,如果存在此檔案,則直接覆蓋
        //createFile(new Path("/test/test.data"), "/home/young/notes/quicksort.txt");

        //獲取指定目錄下的資料夾和檔案資訊
        //ll(new Path("/test"));

        //篩選檔案
        //fileFilter(new Path("/*/*"));

        //排除檔案
        //String regex = "^*ta*" ;
        //fileFilter(new Path("/test/*"), regex);

        //讀取檔案內容
        //read(new Path("/test/test.data"));

        //上傳檔案
        //upload(new Path("/home/young/notes/quicksort.txt"),new Path("/test/"));

        //下載檔案
        //download(new Path("/test/test.data"), new Path("/home/young/"));

        //刪除檔案,已過時
        //delete(new Path("/test/test.null"));

        //刪除目錄
        //delete(new Path("/test/test/"),true);  //true可以遞迴刪除,false不可以
    }
    /**
     * 建立目錄
     * @param path : 要建立的目錄
     */
    public static void mkdir(Path path) throws IOException{
        FileSystem hdfs = HdfsUtils.getFilesystem();
        hdfs.mkdirs(path);
    }

    /**
     * 建立檔案並寫入資料
     * @param path 要建立的檔案
     * @param srcPath 輸入資料的源地址
     * @throws IOException
     */
    public static void createFile(Path path, String srcPath) throws IOException{
        FileSystem hdfs =  HdfsUtils.getFilesystem();
        FSDataOutputStream out =  hdfs.create(path,new Progressable(){
            public void progress(){
                //據說是,每64KB輸出一個點
                System.out.print(".");
            }
        });

        InputStream in = new BufferedInputStream(new FileInputStream(srcPath));

        //將wps檔案裡的內容儲存到/test/test.data中
        IOUtils.copyBytes(in, out, 4096,true);
    }

    /**
     * 建立檔案
     * @param path 要建立的檔案
     * @throws IOException
     */
    public static void createFile(Path path) throws IOException{
        FileSystem hdfs =  HdfsUtils.getFilesystem();
        hdfs.create(path);
    }

    /**
     * 獲取指定目錄下的資料夾和檔案資訊(類似linux下的ll命令)
     * @param path
     * @throws FileNotFoundException
     * @throws IOException
     */
    public static void ll(Path path) throws FileNotFoundException, IOException{
        FileSystem hdfs = HdfsUtils.getFilesystem();
        FileStatus [] fileStatus = hdfs.listStatus(path);
        Formatter format = new Formatter(System.out);
        format.format("%s", "Found "+fileStatus.length + " items\n");
        String type;
        String permission;
        int maxLenPermission=0;
        String owner;
        int maxLenOwner=0;
        String group;
        int maxLenGroup=0;
        String fPath;
        int maxLenPath=0;
        for (FileStatus status : fileStatus){
            type=status.isFile() ? "-" : "d" ;
            permission = type+status.getPermission().toString();
            maxLenPermission=maxLenPermission>permission.length()?maxLenPermission:permission.length();

            owner = status.getOwner();
            maxLenOwner=maxLenOwner>owner.length()?maxLenOwner:owner.length();
            group = status.getGroup();
            maxLenGroup=maxLenGroup>group.length()?maxLenGroup:group.length();
            fPath = status.getPath().toString();
            maxLenPath=maxLenPath>fPath.length()?maxLenPath:fPath.length();
        }
        for (FileStatus status : fileStatus){
            type=status.isFile() ? "-" : "d" ;
            permission = type+status.getPermission().toString();

            owner = status.getOwner();
            group = status.getGroup();
            fPath = status.getPath().toString();

            format.format("%"+maxLenPermission+"s  %"+maxLenOwner+"s\t%s\t%"+maxLenPath+"s\n",permission, owner, group , fPath );
        }
        format.close();
    }

    /**
     * 讀取檔案內容
     * @param path
     * @throws IOException
     */
    public static void read(Path path) throws IOException{
        FileSystem hdfs = HdfsUtils.getFilesystem();
        FSDataInputStream fsDataInputStream =  hdfs.open(path);
        IOUtils.copyBytes(fsDataInputStream, System.out, 4096,false);  //注意實現細節 
    }

    /**
     * 上傳檔案
     * @param srcPath
     * @param dstPath
     * @throws IOException
     */
    public static void upload(Path srcPath, Path dstPath) throws IOException{
        FileSystem hdfs = HdfsUtils.getFilesystem();
        hdfs.copyFromLocalFile(srcPath, dstPath);
    }

    /**
     * 下載檔案
     * @param srcPath
     * @param dstPath
     * @throws IOException
     */
    public static void download(Path srcPath, Path dstPath) throws IOException{
        FileSystem hdfs = HdfsUtils.getFilesystem();
        hdfs.copyToLocalFile (srcPath, dstPath);
    }

    /**
     * 刪除檔案
     * @param path
     * @throws IOException
     */
    public static void delete(Path path) throws IOException{
        FileSystem hdfs = HdfsUtils.getFilesystem();
        hdfs.delete(path);
    }

    /**
     * 刪除檔案或目錄
     * @param path
     * @param r
     * @throws IOException
     */
    public static void delete(Path path, boolean r) throws IOException{
        FileSystem hdfs = HdfsUtils.getFilesystem();
        hdfs.delete(path, r);
    }

    /**
     * 獲取符合條件的檔案或目錄
     * @param pathPattern
     * @throws IOException
     */
    public static void fileFilter(Path pathPattern) throws IOException{
        FileSystem hdfs = HdfsUtils.getFilesystem();
        FileStatus[] fileStatus = hdfs.globStatus(pathPattern);//.globStatus(path, new RegexExcludePathFilter(""));
        for( FileStatus  status : fileStatus){
            System.out.println(""+status.getPath());
        }
    }

    /**
     * 獲取不符合條件的檔案或目錄
     * @param pathPattern
     * @param regex
     * @throws IOException
     */
    public static void fileFilter(Path pathPattern, String regex) throws IOException{
        FileSystem hdfs = HdfsUtils.getFilesystem();
        FileStatus[] fileStatus = hdfs.globStatus(pathPattern, new RegexExcludePathFilter(regex));
        for( FileStatus  status : fileStatus){
            System.out.println(""+status.getPath());
        }
    }
}