java實現HDFS增刪改查
阿新 • • 發佈:2019-01-28
環境:Hadoop 2.7.3
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.zookeeper.common.IOUtils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
/**
* Created by LCY on 4/2/2018.
* 對HDFS執行上傳檔案、上傳目錄、檢視檔案、檢視目錄、刪除檔案等操作
*/
public class HDFSUtils {
/**
* 在HDFS建立新的目錄
*
* @param uri HDFS地址,比如: 'hdfs://192.168.12.12'
* @param dir 路徑,比如: '/tmp/testdir'
* @return boolean true-success, false-failed
* @exception IOException something wrong happends when operating files
*/
public boolean mkdir(String uri,String dir) {
try {
if (StringUtils.isBlank(dir)) {
return false;
}
dir = uri + dir;
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dir), conf);
if (!fs.exists(new Path(dir))) {
fs.mkdirs(new Path(dir));
}
fs.close();
return true;
} catch (IOException e) {
System.err.println("ERROR:IO");
return false;
}
}
/**
* 在HDFS刪除目錄
*
* @param uri HDFS地址,比如: 'hdfs://192.168.12.12'
* @param dir 檔案路徑
* @return boolean true-success, false-failed
* @exception IOException 如果檔案已經開啟會跑出異常 FileNotFoundException
*
*/
public boolean deleteDir(String uri,String dir) throws IOException {
if (StringUtils.isBlank(dir)) {
return false;
}
dir = uri + dir;
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dir), conf);
fs.delete(new Path(dir), true);
fs.close();
return true;
}
/**
* 獲取檔案列表
*
* @param uri HDFS地址,比如: 'hdfs://192.168.12.12'
* @param dir 目錄路徑
* @return List<String> list of file names
* @throws IOException file io exception
*/
public List<String> listAll(String uri,String dir) throws IOException {
if (StringUtils.isBlank(dir)) {
return new ArrayList<String>();
}
dir = uri + dir;
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dir), conf);
FileStatus[] stats = fs.listStatus(new Path(dir));
List<String> names = new ArrayList<String>();
for (int i = 0; i < stats.length; ++i) {
if (stats[i].isFile()) {
// regular file
names.add(stats[i].getPath().toString());
} else if (stats[i].isDirectory()) {
// dir
names.add(stats[i].getPath().toString());
} else if (stats[i].isSymlink()) {
// is s symlink in linux
names.add(stats[i].getPath().toString());
}
}
fs.close();
return names;
}
/*
* 上傳檔案到HDFS
* 注意:路徑為完整路徑
* 如果本地檔案不存在有異常 FileNotFoundException
*
* @param uri HDFS地址,比如: 'hdfs://192.168.12.12'
* @param localFile 本地檔案路徑例如F:/test.txt or /usr/local/test.txt
*
* @param hdfsFile hdfs路徑例如: /tmp/dir
* @return boolean true-success, false-failed
*
* @throws IOException file io exception
*/
public boolean uploadLocalFile2HDFS(String uri,String localFile, String hdfsFile) {
if (StringUtils.isBlank(localFile) || StringUtils.isBlank(hdfsFile)) {
return false;
}
try {
hdfsFile = uri + hdfsFile;
Configuration config = new Configuration();
FileSystem hdfs = FileSystem.get(URI.create(uri), config);
Path src = new Path(localFile);
Path dst = new Path(hdfsFile);
hdfs.copyFromLocalFile(src, dst);
hdfs.close();
return true;
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e){
e.printStackTrace();
}
return false;
}
/*
* 在HDFS建立一個新的檔案,並寫入content資料
*
* @param uri HDFS地址,比如: 'hdfs://192.168.12.12'
* @param newFile 檔案路徑: '/tmp/test.txt'
* @param content 寫入的內容
* @return boolean true-success, false-failed
* @throws IOException file io exception
*/
public boolean createNewHDFSFile(String uri,String newFile, String content) throws IOException {
if (StringUtils.isBlank(newFile) || null == content) {
return false;
}
newFile = uri + newFile;
Configuration config = new Configuration();
FileSystem hdfs = FileSystem.get(URI.create(newFile), config);
FSDataOutputStream fsdos = hdfs.create(new Path(newFile));
fsdos.write(content.getBytes("UTF-8"));
fsdos.close();
hdfs.close();
return true;
}
/**
* 在HDFS刪除檔案
*
* @param uri HDFS地址,比如: 'hdfs://192.168.12.12'
* @param hdfsFile 完整的路徑 '/tmp/test.txt'
* @return boolean true-success, false-failed
* @throws IOException file io exception
*/
public boolean deleteHDFSFile(String uri,String hdfsFile) throws IOException {
if (StringUtils.isBlank(hdfsFile)) {
return false;
}
hdfsFile = uri + hdfsFile;
Configuration config = new Configuration();
FileSystem hdfs = FileSystem.get(URI.create(hdfsFile), config);
Path path = new Path(hdfsFile);
boolean isDeleted = hdfs.delete(path, true);
hdfs.close();
return isDeleted;
}
/**
* 讀取檔案內容
*
* @param uri HDFS地址,比如: 'hdfs://192.168.12.12'
* @param hdfsFile 完整的檔案路徑 '/tmp/test.txt'
* @return 檔案的byte[]
* @throws IOException file io exception
*/
public byte[] readHDFSFile(String uri,String hdfsFile) throws Exception {
if (StringUtils.isBlank(hdfsFile)) {
return null;
}
hdfsFile = uri + hdfsFile;
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(hdfsFile), conf);
// check if the file exists
Path path = new Path(hdfsFile);
if (fs.exists(path)) {
FSDataInputStream is = fs.open(path);
// get the file info to create the buffer
FileStatus stat = fs.getFileStatus(path);
// create the buffer
byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
is.readFully(0, buffer);
is.close();
fs.close();
return buffer;
} else {
throw new Exception("the file is not found .");
}
}
/**
* 在檔案中寫入內容
*
* @param uri HDFS地址,比如: 'hdfs://192.168.12.12'
* @param hdfsFile 檔案路徑'/tmp/test.txt'
* @param content 寫入的內容
* @return boolean true-success, false-failed
* @throws Exception something wrong
*/
public boolean append(String uri,String hdfsFile, String content) throws Exception {
if (StringUtils.isBlank(hdfsFile)) {
return false;
}
if(StringUtils.isEmpty(content)){
return true;
}
hdfsFile = uri + hdfsFile;
Configuration conf = new Configuration();
// solve the problem when appending at single datanode hadoop env
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
FileSystem fs = FileSystem.get(URI.create(hdfsFile), conf);
// check if the file exists
Path path = new Path(hdfsFile);
if (fs.exists(path)) {
try {
InputStream in = new ByteArrayInputStream(content.getBytes());
OutputStream out = fs.append(new Path(hdfsFile));
IOUtils.copyBytes(in, out, 4096, true);
out.close();
in.close();
fs.close();
} catch (Exception ex) {
fs.close();
throw ex;
}
} else {
createNewHDFSFile(uri,hdfsFile, content);
}
return true;
}
}
呼叫:
public class test {
@Test
public void update() {
String uri="hdfs://172.16.0.108";
HDFSUtils hdfsUtils=new HDFSUtils();
hdfsUtils.uploadLocalFile2HDFS(uri,"data/ITEM.txt","/tmp/test");
}
}