java整合spring和hadoop HDFS
首先新增 hadoop配置檔案 hbase-site.xml ,這裡只需要配置zk的地址和埠。
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property>
<name>hbase.zookeeper.quorum</name>
<value>127.0.0.1</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>
</configuration>
然後在spring配置檔案中配置下面資訊,讓spring幫我們注入hadoopConfiguration
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:task="http://www.springframework.org/schema/task"
xmlns:cache="http://www.springframework.org/schema/cache" xmlns:hdp="http://www.springframework.org/schema/hadoop"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd
http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd
http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd">
<!-- 預設的hadoopConfiguration,預設ID為hadoopConfiguration,且對於hadoopFile等不需指定ref,自動注入hadoopConfiguration -->
<hdp:configuration resources="classpath:hadoop/hbase-site.xml" />
<hdp:hbase-configuration configuration-ref="hadoopConfiguration" />
<!-- hadoop hdfs 操作類FileSystem,用來讀寫HDFS檔案 -->
<hdp:file-system id="hadoopFile" configuration-ref="hadoopConfiguration" />
<!-- 配置HbaseTemplate -->
<bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate">
<property name="configuration" ref="hbaseConfiguration" />
</bean>
</beans>
配置比較簡單,下面是關鍵的HDFS操作類
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
/**
* HDFS操作類
*
* @author kokJuis
* @version 1.0
* @date 2016-12-12
* @email [email protected]
*/
public class HDFSUtil {
private HDFSUtil() {
}
// hadoop fs的配置檔案
static Configuration conf = new Configuration(true);
static {
// 指定hadoop fs的地址
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
conf.set("fs.defaultFS", "hdfs://127.0.0.1:9000");
}
/**
* 判斷路徑是否存在
*
* @param conf
* @param path
* @return
* @throws IOException
*/
public static boolean exits(String path) throws IOException {
FileSystem fs = FileSystem.get(conf);
return fs.exists(new Path(path));
}
/**
* 建立檔案
*
* @param conf
* @param filePath
* @param contents
* @throws IOException
*/
public static void createFile(String filePath, byte[] contents)
throws IOException {
FileSystem fs = FileSystem.get(conf);
Path path = new Path(filePath);
FSDataOutputStream outputStream = fs.create(path);
outputStream.write(contents);
outputStream.close();
fs.close();
}
/**
* 建立檔案
*
* @param conf
* @param filePath
* @param fileContent
* @throws IOException
*/
public static void createFile(String filePath, String fileContent)
throws IOException {
createFile(filePath, fileContent.getBytes());
}
/**
* @param conf
* @param localFilePath
* @param remoteFilePath
* @throws IOException
*/
public static void copyFromLocalFile(String localFilePath,
String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path localPath = new Path(localFilePath);
Path remotePath = new Path(remoteFilePath);
fs.copyFromLocalFile(false, true, localPath, remotePath);
fs.close();
}
/**
* 刪除目錄或檔案
*
* @param conf
* @param remoteFilePath
* @param recursive
* @return
* @throws IOException
*/
public static boolean deleteFile(String remoteFilePath, boolean recursive)
throws IOException {
FileSystem fs = FileSystem.get(conf);
boolean result = fs.delete(new Path(remoteFilePath), recursive);
fs.close();
return result;
}
/**
* 刪除目錄或檔案(如果有子目錄,則級聯刪除)
*
* @param conf
* @param remoteFilePath
* @return
* @throws IOException
*/
public static boolean deleteFile(String remoteFilePath) throws IOException {
return deleteFile(remoteFilePath, true);
}
/**
* 檔案重新命名
*
* @param conf
* @param oldFileName
* @param newFileName
* @return
* @throws IOException
*/
public static boolean renameFile(String oldFileName, String newFileName)
throws IOException {
FileSystem fs = FileSystem.get(conf);
Path oldPath = new Path(oldFileName);
Path newPath = new Path(newFileName);
boolean result = fs.rename(oldPath, newPath);
fs.close();
return result;
}
/**
* 建立目錄
*
* @param conf
* @param dirName
* @return
* @throws IOException
*/
public static boolean createDirectory(String dirName) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path dir = new Path(dirName);
boolean result = false;
if (!fs.exists(dir)) {
result = fs.mkdirs(dir);
}
fs.close();
return result;
}
/**
* 列出指定路徑下的所有檔案(不包含目錄)
*
* @param conf
* @param basePath
* @param recursive
*/
public static RemoteIterator<LocatedFileStatus> listFiles(String basePath,
boolean recursive) throws IOException {
FileSystem fs = FileSystem.get(conf);
RemoteIterator<LocatedFileStatus> fileStatusRemoteIterator = fs
.listFiles(new Path(basePath), recursive);
return fileStatusRemoteIterator;
}
/**
* 列出指定路徑下的檔案(非遞迴)
*
* @param conf
* @param basePath
* @return
* @throws IOException
*/
public static RemoteIterator<LocatedFileStatus> listFiles(String basePath)
throws IOException {
FileSystem fs = FileSystem.get(conf);
RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(
new Path(basePath), false);
fs.close();
return remoteIterator;
}
/**
* 列出指定目錄下的檔案\子目錄資訊(非遞迴)
*
* @param conf
* @param dirPath
* @return
* @throws IOException
*/
public static FileStatus[] listStatus(String dirPath) throws IOException {
FileSystem fs = FileSystem.get(conf);
FileStatus[] fileStatuses = fs.listStatus(new Path(dirPath));
fs.close();
return fileStatuses;
}
/**
* 讀取檔案內容
*
* @param conf
* @param filePath
* @return
* @throws IOException
*/
public static byte[] readFile(String filePath) throws IOException {
byte[] fileContent = null;
FileSystem fs = FileSystem.get(conf);
Path path = new Path(filePath);
if (fs.exists(path)) {
InputStream inputStream = null;
ByteArrayOutputStream outputStream = null;
try {
inputStream = fs.open(path);
outputStream = new ByteArrayOutputStream(
inputStream.available());
IOUtils.copyBytes(inputStream, outputStream, conf);
fileContent = outputStream.toByteArray();
} finally {
IOUtils.closeStream(inputStream);
IOUtils.closeStream(outputStream);
fs.close();
}
}
return fileContent;
}
/**
* 下載 hdfs上的檔案
*
* @param conf
* @param uri
* @param remote
* @param local
* @throws IOException
*/
public static void download(String remote, String local) throws IOException {
Path path = new Path(remote);
FileSystem fs = FileSystem.get(conf);
fs.copyToLocalFile(path, new Path(local));
System.out.println("download: from" + remote + " to " + local);
fs.close();
}
這裡有一點需要注意的,就是在你使用HDFS的時候,要顯示指定hadoop fs的地址,如果不指定fs的地址,雖然也能上傳,但實際並沒有使用到HDFS。
下面是一個下載檔案的示例
---------------------
作者:西門吹水_
來源:CSDN
原文:https://blog.csdn.net/KokJuis/article/details/53586406
版權宣告:本文為博主原創文章,轉載請附上博文連結!