1. 程式人生 > 其它 >從本地上傳檔案至HDFS

從本地上傳檔案至HDFS

一、結構化資料直接上傳

    如果我們拿到要上傳的資料是結構化的,那麼就不需要在對資料做處理, 直接從本地上傳到HDFS上即可。

程式碼層面也比較簡單:

public class UploadFileToHDFS {
    public static void main(String[] args) throws Exception {
        Configuration cfg = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.145.200:9000"),cfg);

// 自己修改登入hdfs的使用者 也可用本地系統使用者 去hdfs上授權 System.setProperty("HADOOP_USER_NAME","root"); /* 檔案上傳 */ // 獲得要上傳檔案的本地路徑 Path src = new Path("f:/mydata/logs/Log_20200101.log");

// 上傳的路徑 Path dst = new Path("/tmp");

// 上傳命令 fs.copyFromLocalFile(src,dst);

//     釋放資源
fs.close(); } }

二、半結構化 / 非結構化資料轉化後再上傳

    有時我們拿到要上傳的資料不一定是結構化的,可能是半結構化(JSON等)或者非結構化,這樣即便上傳也沒什麼意義。因此,我們先用Java語言,將其結構化了再上傳。

例如:

我們拿到的資料:

我們上傳後的資料;

package com.zyp.myhdfs.services;

import com.alibaba.fastjson.JSON;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /* * * todo:多執行緒傳遞多個檔案到HDFS上 * * */ public class UploadFileToHDFS2 { /* * 傳進來路徑 path 和檔名 filename, 將所有檔案全部上傳 * * */ public void writeFileToHDFS(String path,String filename) { FileSystem fs = null; FileReader fis = null; // 輸入流 讀取本地檔案 BufferedReader bis = null; // 對輸入流整行整行讀入 借用BufferedReader FSDataOutputStream fos = null;  // 輸出流 往HDFS上輸出資料 try { fs = FileSystem.get(new URI("hdfs://192.168.145.200:9000"),new Configuration()); fis = new FileReader(path+"/"+filename); bis = new BufferedReader(fis); // 自己修改登入hdfs的使用者 也可用本地系統使用者 去hdfs上授權即可 System.setProperty("HADOOP_USER_NAME","root"); /* 用上面建立好的物件,往上寫資料 */ // 先去建立一個檔案,然後往裡面寫 fos = fs.create(new Path("/logs/"+filename)); String line =""; while ((line = bis.readLine()) !=null ){ Info info = JSON.parseObject(line, Info.class); // System.out.println(info.getGoodid()+","+info.getMachine().getMemory()); // 利用Sting模板 String ctx = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n", //設定好格式,通過對映去資料來源中獲取對應的資料資料資訊 info.getMachine().getCpuType(), info.getMachine().getMemory(), info.getMachine().getCpuSeed(), info.getActTime(), info.getActType(), info.getGoodId(), info.getPage(), info.getUseId(), info.getBrowse().getBrowseType(), info.getBrowse().getBrowseVersion()); fos.write(ctx.getBytes()); } fos.flush(); } catch (IOException e) { e.printStackTrace(); } catch (URISyntaxException e) { e.printStackTrace(); } finally { try { fos.close(); bis.close(); fis.close(); // fs.close(); 這裡關閉通道會影響其他傳輸 } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) { ExecutorService es = Executors.newFixedThreadPool(30); // 固定執行緒池 final UploadFileToHDFS2 ufh = new UploadFileToHDFS2(); final String filePath = "f:/mydata/logs"; // 迴圈獲取所有的子檔案 File file = new File(filePath); String[] fs = file.list(); for (String fileName:fs){ es.execute(new Runnable() { @Override public void run() { ufh.writeFileToHDFS(filePath,fileName); } }); } es.shutdown(); } }

三、分組整合檔案後在上傳至一個檔案裡

    通過對上面在HDFS上的執行效果觀察發現,本地一個檔案可能很小,遠遠不夠一個塊大小(128M),但是他也單獨佔據一個block塊。這樣就造成了很大的資源浪費,這裡考慮,將檔案整合後再上傳,儘可能的節省資源。

例如:

  將一年內的日誌資訊,按月進行分類,HDFS上,每個月一個資料夾,然後往裡面新增對應該月的資料資訊

import com.alibaba.fastjson.JSON;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

/*
 * path: 原始檔路徑
 * newFileName: hdfs上檔案和資料夾的名字
 * files: [log_20200101.log,log_20200102.log]
 * todo: 考慮到每個檔案佔據一個block塊,太浪費空間 這裡將檔案整合在一起存到HDFS上
 *        按月份  將每個月的檔案都寫到一個資料夾下
 * */

public class MyMergeFile {
    public void batchWriteToHDFS(String path, String hdfsFileName, List<String> files) {
        FSDataOutputStream fos = null;
        try {
            FileSystem fs = FileSystem.get(new URI("hdfs://192.168.145.200:9000"), new Configuration());
//        分割hdfs檔案  獲得資料夾名字
            String folderName = hdfsFileName.split("_")[1];

//        自己修改登入hdfs的使用者   也可用本地系統使用者 去hdfs上授權即可
            System.setProperty("HADOOP_USER_NAME", "root");


//        在hdfs下先建立資料夾
            fs.mkdirs(new Path("/logs" + folderName));

//        在資料夾下建立一個檔案
            fos = fs.create(new Path("/logs" + folderName + "/" + hdfsFileName));

//        迴圈讀取檔案 並向hdfs檔案中寫入資料
            for (String localFile : files) {
                BufferedReader br = null;
                try {
                    br = new BufferedReader(new FileReader(path + "/" + localFile));
                    String line = "";
                    while ((line = br.readLine()) != null) {
                        try {
                            Info info = JSON.parseObject(line, Info.class);
                            //            System.out.println(info.getGoodid()+","+info.getMachine().getMemory());
                            //            利用Sting模板
                            String ctx = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n",
                                    info.getMachine().getCpuType(),
                                    info.getMachine().getMemory(),
                                    info.getMachine().getCpuSeed(),
                                    info.getActTime(),
                                    info.getActType(),
                                    info.getGoodId(),
                                    info.getPage(),
                                    info.getUseId(),
                                    info.getBrowse().getBrowseType(),
                                    info.getBrowse().getBrowseVersion());
                            fos.write(ctx.getBytes());
                        } catch (Exception e) {
                            continue; // 防止原始檔裡JSON 格式錯誤 ,繼續往下執行
                        }
                    }
                    fos.flush();
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        br.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        } finally {
            try {
                fos.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /*
     *  將一年內的所有檔案整合  分組 上傳到hdfs
     *
     * */
    public static void main(String[] args) {
        /*
         * 正常的檔案分組
         * */
//        File file = new File("f:/mydata/logs");
////        獲取所有檔案
//        String[] files = file.list();
////        按照月份分組  log_202001,log_202002
//        Map<String, List<String>> map = new HashMap<>();
//        for (String fn : files) {
//            String keyName = fn.substring(0,10); // 截取出來所有檔案的月份部分
//            if (map.containsKey(keyName)){   // 判斷該月份是否存在
//                map.get(keyName).add(fn);  // 存在該月份  就直接將檔案存到該 key的list中
//            }else {
//                List<String> lst = new ArrayList<>();  // 不存在  就建立一個list  存放檔名
//                lst.add(fn);
//                map.put(keyName,lst);
//            }
//        }
//        System.out.println(map);


        /*
         * 使用java工具  jdk必須是1.8以上
         * */
        File file = new File("f:/mydata/logs");
//        獲取所有檔案
        String[] files = file.list();
        List<String> lst = Arrays.asList(files);

//        java 流式程式設計
        Map<String, List<String>> mp = lst.stream().collect(
                Collectors.groupingBy(line -> line.substring(0, 10))
        );
//        System.out.println(mp);


//        遍歷hashmap
        ExecutorService es = Executors.newFixedThreadPool(12);
        final MyMergeFile mmf = new MyMergeFile();
        for (String key : mp.keySet()) {
            final String keyName = key;
            final List<String> fs = mp.get(key);
            es.execute(new Runnable() {
                @Override
                public void run() {
                    mmf.batchWriteToHDFS("f:/mydata/logs",keyName,fs);
                }
            });
        }
        es.shutdown();
    }
}