HADOOP Java API 之 HDFS2.x操作
阿新 • • 發佈:2019-01-26
Java api操作hdfs2.x, 主要包括以下幾個方法:
- 1. create dir
- 2.1 create file(don’t use IOUtils)
- 2.2 create file(use IOUtils)
- 3. upload local file(s)
- 4. rename file(s)
- 5. rename file(s)
- 6. scan dirs and file information
- 7. 查詢某個檔案在HDFS叢集的位置
- 8. 獲取HDFS叢集上所有節點名稱資訊
程式碼demo如下
package com.david.bigdata.hadoop2x.hads_api;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
/**
* Created by david on 16/11/13.
*/
public class HdfsCURDTest {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem hdfs = null;
FileStatus[] fstatus = null ;
try {
hdfs = FileSystem.get(conf);
System.out.println("connect HDFS: " + new URI("hdfs://localhost:9000"));
} catch (Exception e) {
System.err.println("Error on connect HDFS");
}
System.out.println("\n-------------昏鴿線--------------\n");
System.out.println("\n-------------hdfs操作之: 1. create dir--------------\n");
//hdfs操作之: 1. create dir
try {
if (!hdfs.exists(new Path("/liuwei0376"))) {
hdfs.mkdirs(new Path("/liuwei0376"));
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("\n-------------昏鴿線--------------\n");
System.out.println("\n-------------hdfs操作之: 2.1 create file(don't use IOUtils)--------------\n");
//hdfs操作之: 2.1 create file(don't use IOUtils)
String fsrc = "/Users/david/Downloads/accounts.json";
FileInputStream fis = null;
FSDataOutputStream fsDataOutputStream = null;
try {
fis = new FileInputStream(fsrc);
Path path = new Path("/liuwei0376/mr/accounts.json");
fsDataOutputStream = hdfs.create(path);
byte[] buff = new byte[1024];
int readCount = 0;
readCount = fis.read(buff);
while (readCount != -1) {
fsDataOutputStream.write(buff, 0, readCount);
readCount = fis.read(buff);//read next patch data
}
System.out.println(path + " create is over");
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fis != null) {
try {
fis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (fsDataOutputStream != null) {
try {
fsDataOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
System.out.println("\n-------------昏鴿線--------------\n");
System.out.println("\n-------------hdfs操作之: 2.2 create file(use IOUtils)--------------\n");
//hdfs操作之: 2.2 create file(use IOUtils)
FSDataOutputStream out2 = null;
FileInputStream in2 = null;
int buff2 = 1024;
try {
out2 = hdfs.create(new Path("/liuwei0376/mr/dependency.txt"));
in2 = new FileInputStream("/Users/david/Downloads/dependency.txt");
/**
* copyBytes method usage
*
* in: origin file path
* out: hdfs dir
* buff2: buffer size
* close: whether close the stream.
*/
IOUtils.copyBytes(in2, out2, buff2, true);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("\n-------------昏鴿線--------------\n");
System.out.println("\n-------------hdfs操作之: 3. upload local file(s)--------------\n");
//hdfs操作之: 3. upload local file(s)
/**
* delSrc - whether to delete the src是否刪除原始檔
overwrite - whether to overwrite an existing file是否覆蓋已存在的檔案
srcs - array of paths which are source 可以上傳多個檔案陣列方式
dst – path 目標路徑
fileSystem.copyFromLocalFile(src, dst);
fileSystem.copyFromLocalFile(delSrc, src, dst);
fileSystem.copyFromLocalFile(delSrc, overwrite, src, dst);
fileSystem.copyFromLocalFile(delSrc, overwrite, srcs, dst);
*/
try {
hdfs.copyFromLocalFile(
true,
true,
new Path("/Users/david/Downloads/selenium-java-3.0.0-beta2"),
new Path("/liuwei0376/mr/selenium-java-3.0.0-beta2")
);
} catch (IOException e) {
System.out.println("error in copyFromLocalFile");
}
System.out.println("\n-------------昏鴿線--------------\n");
System.out.println("\n-------------hdfs操作之: 4. rename file(s)--------------\n");
//hdfs操作之: 4. rename file(s)
/**
* fileSystem.rename(src, dst);
*/
try {
hdfs.rename(
new Path("/liuwei0376/mr/selenium-java-3.0.0-beta2"),
new Path("/liuwei0376/mr/selenium-java-3.0.0-beta3")
);
} catch (IOException e) {
System.err.println("hdfs.rename error");
}
System.out.println("\n-------------昏鴿線--------------\n");
System.out.println("\n-------------hdfs操作之: 5. rename file(s)--------------\n");
//hdfs操作之: 5. rename file(s)
/**
* True 表示遞迴刪除
* fileSystem.delete(new Path("/d1"), true);
*/
try {
hdfs.delete(new Path("/liuwei0376/mr/selenium-java-3.0.0-beta3"), true);
} catch (IOException e) {
System.err.println("hdfs.delete error");
}
System.out.println("\n-------------昏鴿線--------------\n");
System.out.println("\n-------------hdfs操作之: 6. scan dirs and file information--------------\n");
//hdfs操作之: 6. scan dirs and file information
int i = 0;
try {
fstatus = hdfs.listStatus(new Path("/user/david/hadoop_java_files"));
System.out.println(fstatus.length);
//列出檔案屬性
for (FileStatus fs : fstatus) {
System.out.println("\n------- " + ++i + " -------");
System.out.println("fs.getAccessTime() = " + fs.getAccessTime());
System.out.println("fs.getGroup() = " + fs.getGroup());
System.out.println("fs.getOwner() = " + fs.getOwner());
System.out.println("fs.getBlockSize() = " + fs.getBlockSize());
System.out.println("fs.getLen() = " + fs.getLen());
System.out.println("fs.getModificationTime() = " + fs.getModificationTime());
System.out.println("fs.getPath() = " + fs.getPath());
System.out.println("fs.getPermission() = " + fs.getPermission());
System.out.println("fs.getReplication() = " + fs.getReplication());
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("\n-------------昏鴿線--------------\n");
System.out.println("\n-------------hdfs操作之: 7. 查詢某個檔案在HDFS叢集的位置--------------\n");
//hdfs操作之: 7. 查詢某個檔案在HDFS叢集的位置
/**
* 注意: Path 必須是檔案, 不能為目錄
*/
try {
FileStatus fs7 = hdfs.getFileStatus(new Path("/liuwei0376/mr/accounts.json"));
BlockLocation[] blockLocations = hdfs.getFileBlockLocations(fs7, 0, fs7.getLen());
for (int j = 0, k = blockLocations.length; j < k; j++) {
String[] hosts = blockLocations[j].getHosts();
System.out.println("block_" + j + "_location: " + hosts[0]);
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("\n-------------昏鴿線--------------\n");
System.out.println("\n-------------hdfs操作之: 8. 獲取HDFS叢集上所有節點名稱資訊--------------\n");
//hdfs操作之: 8. 獲取HDFS叢集上所有節點名稱資訊
DistributedFileSystem distributedFileSystem = (DistributedFileSystem) hdfs;
try {
DatanodeInfo[] datanodeInfos = distributedFileSystem.getDataNodeStats();
for (int n = 0, m = datanodeInfos.length; n < m; n++) {
System.out.println("datanode_" + n + "_name: " + datanodeInfos[n].getHostName());
}
} catch (IOException e) {
e.printStackTrace();
}
if (hdfs != null) {
try {
hdfs.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}