1. 程式人生 > 實用技巧 >深入理解Hadoop讀書筆記-3

深入理解Hadoop讀書筆記-3

背景

公司的物流業務系統目前實現了使用storm叢集進行過門事件的實時計算處理,但是還有一個需求,我們需要儲存每個標籤上傳的每條明細資料,然後進行定期的標籤報表統計,這個是目前的實時計算框架無法滿足的,需要考慮離線儲存和計算引擎。

標籤的資料量是巨大的,此時儲存在mysql中是不合適的,所以我們考慮了分散式儲存系統HDFS。目前考慮的架構是,把每條明細資料儲存到HDFS中,利用Hive或者其他類SQL的解析引擎,定期進行離線統計計算。

查詢相關資料後,我下載了深入理解Haddoop這本書,從大資料的一些基礎原理開始調研,這一系列的筆記就是調研筆記。

系列文章:

深入理解Hadoop讀書筆記1

深入理解Hadoop讀書筆記2

深入理解Hadoop-基礎-HDFS的API使用

深入理解Hadoop讀書筆記2中講解過Hadoop在Linux環境下的部署,在虛擬機器中安裝好Hadoop後,我們需要搭建本地環境來進行開發除錯。

下面會講解Windows下使用IDEA,通過HDFS的JAVA API,來連線並操作虛擬機器中的HDFS建立一個資料夾的具體過程。

IDEA的安裝和破解這裡略過,預設讀者是有一定JAVA開發經驗的。

安裝好IDEA後,新建一個maven的專案,這裡選擇下面的quickstart模板。

建立好專案後,修改pom檔案,增加hadoop-client相關,因為我虛擬機器中安裝的是2.10.0版本,所以這裡也使用相同的客戶端版本。

 <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.10.0</version>
 </dependency>

然後建立一個Class檔案,新增下面的程式碼,即可實現,遠端連線HDFS並在其中新建一個路徑為/hdfs/test的資料夾。

這裡有幾個要點需要注意:

  1. HDFS進行偽分散式部署的時候,core-site.xml中填寫的是localhost,現在需要修改為虛擬機器的ip地址,否則使用下面程式碼連線HDFS時會報下面的異常。

    failed on connection exception: java.net.ConnectException:Connection refused
    
    <configuration>
    <property>
            <name>fs.defaultFS</name>
            <value>hdfs://192.168.202.129:9000</value>
        </property>
    </configuration>
    
    
  2. 使用者名稱需要填寫為HDFS中的資料夾擁有者的使用者名稱

    用命令檢視下可知,資料夾的所有者為ging

    ging@ubuntu:~/hadoop/hadoop-2.10.0$ bin/hdfs dfs -ls /
    Found 3 items
    drwxr-xr-x   - ging supergroup          0 2020-09-01 23:47 /hdfs
    drwx------   - ging supergroup          0 2020-08-30 20:26 /tmp
    drwxr-xr-x   - ging supergroup          0 2020-08-30 20:27 /user
    

完整程式碼如下:

package org.example;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.net.URI;

public class HDFSApp {

    public static void main(String[] args) throws Exception {
        //注意URI中為虛擬機器中的HDFS的地址和埠
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.202.129:9000"), new Configuration(), "ging");
        //給定一個路徑,新建一個資料夾,並列印返回結果
        boolean result = fileSystem.mkdirs(new Path("/hdfs/test"));
        System.out.println(result);
    }
}

返回結果:

Connected to the target VM, address: '127.0.0.1:59339', transport: 'socket'
true
Disconnected from the target VM, address: '127.0.0.1:59339', transport: 'socket'

除了上面用來演示講解的建立資料夾功能,下面的程式碼還記錄了,常用的基本HDFS的JAVA API,可以參考

package org.example;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.*;
import java.net.URI;

/**
 * Unit test for simple App.
 */
public class AppTest {

    public static final String HDFS = "hdfs://192.168.202.129:9000";
    private Configuration configuration;
    private FileSystem fileSystem;

    @Before
    public void setUp() throws Exception {
        System.out.println("---setup---");
        configuration = new Configuration();
        fileSystem = FileSystem.get(new URI(HDFS), configuration, "ging");
    }

    @Test
    public void mkdirs() throws IOException {
        boolean result = fileSystem.mkdirs(new Path("/hdfs/test/test"));
        System.out.println(result);
    }

    @Test
    public void text() throws Exception {
        FSDataInputStream inputStream = fileSystem.open(new Path("/user/ging/input/core-site.xml"));
        IOUtils.copyBytes(inputStream, System.out, 1024);
    }

    @Test
    public void create() throws Exception {
        FSDataOutputStream outputStream = fileSystem.create(new Path("/hdfs/test/test/a.txt"));
        outputStream.writeUTF("hello world");
        outputStream.flush();
        outputStream.close();
    }

    @Test
    public void rename() throws Exception {
        Path oldPath = new Path("/hdfs/test/test/a.txt");
        Path newPath = new Path("/hdfs/test/test/b.txt");
        boolean rename = fileSystem.rename(oldPath, newPath);
        System.out.println(rename);
    }


    @Test
    public void copyFromLocal() throws Exception {
        Path local = new Path("C:\\Users\\wgg96\\Documents\\personal-code\\suanfa-note\\CMakeLists.txt");
        Path remote = new Path("/hdfs/test/test/c.txt");
        fileSystem.copyFromLocalFile(local, remote);
    }

    @Test
    public void copyBigFileWithProgress() throws Exception {
        InputStream inputStream = new BufferedInputStream(new FileInputStream(new File("C:\\Users\\wgg96\\Documents\\安裝包\\開發\\jdk-8u251-windows-x64.exe")));

        FSDataOutputStream outputStream = fileSystem.create(new Path("/hdfs/test/test/d.exe"), new Progressable() {
            @Override
            public void progress() {
                System.out.print(".");
            }
        });

        IOUtils.copyBytes(inputStream, outputStream, 4096);
    }


    /**
     * TODO 這裡在windows系統下跑會報異常
     *
     * @throws Exception
     */
    @Test
    public void copyToLocal() throws Exception {
        fileSystem.copyToLocalFile(new Path("/hdfs/test/test/d.exe"), new Path("/tmp/d.exe"));
    }


    @Test
    public void listFile() throws Exception {
        FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/hdfs/test/test/"));
        for (FileStatus fileStatus : fileStatuses) {
            String isDir = fileStatus.isDirectory() ? "資料夾" : "檔案";
            String permission = fileStatus.getPermission().toString();
            short replication = fileStatus.getReplication();
            String owner = fileStatus.getOwner();
            String group = fileStatus.getGroup();
            String path = fileStatus.getPath().toString();
            System.out.println(isDir + "\t" +
                    permission + "\t" +
                    replication + "\t" +
                    owner + "\t" +
                    group + "\t" +
                    path + "\t"
            );
        }
    }


    @After
    public void tearDown() {
        configuration = null;
        fileSystem = null;
        System.out.println("---teardown---");
    }
}