使用HDFS客戶端java api讀取hadoop叢集上的資訊
阿新 • • 發佈:2022-05-02
本文介紹使用hdfs java api的配置方法。
1、先解決依賴,pom
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> <scope>provided</scope> </dependency>
2、配置檔案,存放hdfs叢集配置資訊,基本都是來源於core-site.xml和hdfs-site.xml,可以根據hdfs叢集client端配置檔案裡的資訊進行填寫
#============== hadoop =================== hdfs.fs.defaultFS=hdfs://mycluster-tj hdfs.ha.zookeeper.quorum=XXXX-apache00.XX01,XXXX-apache01.XX01,XXXX-apache02.XX01 hdfs.dfs.nameservices=XXXX hdfs.dfs.ha.namenodes.mycluster-tj=XX1,XX2 hdfs.dfs.namenode.rpc-address.mycluster-tj.nn1=XXXX-apachenn01.XX01:8020 hdfs.dfs.namenode.rpc-address.mycluster-tj.nn2=XXXX-apachenn02.XX01:8020
3、java client api
import java.io.IOException; import java.net.URI; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.util.Date; public class HadoopClient { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); private FileSystem fs; private String defaultFS; private String zKQuorum; private String nameServices; private String nameNodes; private String rpcAddressNN1; private String rpcAddressNN2; public void setDefaultFS(String defaultFS) { this.defaultFS = defaultFS; } public String getDefaultFS() { return defaultFS; } public void setZKQuorum(String zKQuorum) { this.zKQuorum = zKQuorum; } public String getzKQuorum() { return zKQuorum; } public void setNameServices(String nameServices) { this.nameServices = nameServices; } public String getNameServices() { return nameServices; } public void setNameNodes(String nameNodes) { this.nameNodes = nameNodes; } public String getNameNodes() { return nameNodes; } public void setRpcAddressNN1(String rpcAddressNN1) { this.rpcAddressNN1 = rpcAddressNN1; } public String getRpcAddressNN1() { return rpcAddressNN1; } public void setRpcAddressNN2(String rpcAddressNN2) { this.rpcAddressNN2 = rpcAddressNN2; } public String getRpcAddressNN2() { return rpcAddressNN2; } public void init() { try { Configuration conf = new Configuration(); conf.set("fs.defaultFS", defaultFS); conf.set("ha.zookeeper.quorum", zKQuorum); conf.set("dfs.nameservice", nameServices); conf.set("dfs.ha.namenodes.mycluster-tj", nameNodes); conf.set("dfs.namenode.rpc-address.mycluster-tj.nn1", rpcAddressNN1); conf.set("dfs.namenode.rpc-address.mycluster-tj.nn2", rpcAddressNN2); fs = FileSystem.get(new URI(defaultFS), conf); } catch (Exception ex) { ex.printStackTrace(); } } public void stop() { try { fs.close(); } catch(Exception e) { } } public boolean exists(String path) { boolean isExists = false; try { Path hdfsPath = new Path(path); isExists = fs.exists(hdfsPath); } catch (Exception ex) { logger.error("exists error: {}", ex.getMessage()); } return isExists; } public String getModificationTime(String path) throws IOException { String modifyTime = null; try { Path hdfsPath = new Path(path); FileStatus fileStatus = fs.getFileStatus(hdfsPath); long modifyTimestamp = fileStatus.getModificationTime(); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); Date date = new Date(modifyTimestamp); modifyTime = simpleDateFormat.format(date); } catch(Exception ex) { logger.error("getModificationTime error: {}", ex.getMessage()); } return modifyTime; } }
4、configuration
import com.xiaoju.dqa.prometheus.client.hadoop.HadoopClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class HadoopConfiguration {
@Value("${hdfs.fs.defaultFS}")
private String defaultFS;
@Value("${hdfs.ha.zookeeper.quorum}")
private String zKQuorum;
@Value("${hdfs.dfs.nameservices}")
private String nameServices;
@Value("${hdfs.dfs.ha.namenodes.mycluster-tj}")
private String nameNodes;
@Value("${hdfs.dfs.namenode.rpc-address.mycluster-tj.nn1}")
private String rpcAddressNN1;
@Value("${hdfs.dfs.namenode.rpc-address.mycluster-tj.nn2}")
private String rpcAddressNN2;
@Bean(initMethod = "init", destroyMethod = "stop")
public HadoopClient hadoopClient() {
HadoopClient hadoopClient = new HadoopClient();
hadoopClient.setDefaultFS(defaultFS);
hadoopClient.setZKQuorum(zKQuorum);
hadoopClient.setNameServices(nameServices);
hadoopClient.setNameNodes(nameNodes);
hadoopClient.setRpcAddressNN1(rpcAddressNN1);
hadoopClient.setRpcAddressNN2(rpcAddressNN2);
return hadoopClient;
}
}
今天被一個問題坑的要死了,回來補這篇文章。
如果你要訪問的叢集採用了viewfs方式管理資料,按照本文上面的方法連結叢集是有問題。會導致由URI和nameservices解析成功的namenode才可以訪問,而其他的訪問不了!!!
如果你想解決這個問題,在api部分你要去掉URI部分和nameservices配置,直接使用叢集客戶端hdfs-site.xml和core-site.xml
應該是這樣的。
package com.xiaoju.dqa.jazz.hadoop.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class HadoopClient {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
private FileSystem fs;
public void init() {
try {
Configuration conf = new Configuration();
conf.addResource("core-site.xml");
conf.addResource("hdfs-site.xml");
conf.addResource("mount-table.xml");
fs = FileSystem.get(conf);
} catch (Exception ex) {
ex.printStackTrace();
}
}
public void stop() {
try {
fs.close();
} catch(Exception e) {
}
}
public boolean exists(String path) {
boolean isExists = true;
try {
Path hdfsPath = new Path(path);
isExists = fs.exists(hdfsPath);
} catch (Exception e) {
logger.error("[HDFS]判斷檔案是否存在失敗", e);
}
return isExists;
}
public String getModificationTime(String path) throws IOException {
String modifyTime = null;
try {
Path hdfsPath = new Path(path);
FileStatus fileStatus = fs.getFileStatus(hdfsPath);
long modifyTimestamp = fileStatus.getModificationTime();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
Date date = new Date(modifyTimestamp);
modifyTime = simpleDateFormat.format(date);
} catch(Exception e) {
logger.error("[HDFS]獲取最近修改時間失敗", e);
}
return modifyTime;
}
public long getPathSize(String path) throws IOException {
long size = -1L;
try {
Path hdfsPath = new Path(path);
size = fs.getContentSummary(hdfsPath).getLength();
} catch (Exception e) {
logger.error("[HDFS]獲取路徑大小失敗", e);
}
return size;
}
}
config中也不需要傳任何引數了
package com.xiaoju.dqa.jazz.hadoop.configuration;
import com.xiaoju.dqa.jazz.hadoop.client.HadoopClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class HadoopConfig {
@Bean(initMethod = "init", destroyMethod = "stop")
public HadoopClient hadoopClient() {
HadoopClient hadoopClient = new HadoopClient();
return hadoopClient;
}
}