1. 程式人生 > >阿里OSS API總結

阿里OSS API總結

分兩種

第一種是單機模式訪問oss的ossclient客戶端;

第二種是MR程式訪問oss的API,由emr封裝了的API,對於MR程式來說此種方法更可靠。

(親身經歷過:MR程式用ossclient訪問有時候會因為網路問題導致下載oss失敗,後來換成第二種API後不再出現此類問題)

(https://help.aliyun.com/document_detail/28117.html?spm=5176.product28066.6.618.5wAJKR)

第三種是可將oss檔案直接讀成RDD<String>形式,再手動拆分String封裝成物件,

詳見https://help.aliyun.com/document_detail/28116.html?spm=5176.product28066.6.616.NezW5R。

第一種

package com.jianfeitech.bd.res.db.oss.access;


import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.aliyun.oss.ClientException;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.model.GetObjectRequest;
import com.aliyun.oss.model.ListObjectsRequest;
import com.aliyun.oss.model.OSSObject;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.ObjectMetadata;
import com.aliyun.oss.model.PutObjectResult;
import com.jianfeitech.bd.common.conf.access.BdResConfAccess;
import com.jianfeitech.bd.common.conf.access.ResConfAccess;
import com.jianfeitech.bd.common.conf.model.db.Oss;
import com.jianfeitech.bd.util.crypto.MD5Util;


public class OSSAccess implements Closeable {
private static Logger logger = LoggerFactory.getLogger(OSSAccess.class);


public int maxKye;


private OssCli ossCli;


/**
* ossConfName   配置框架中配置的OSS的名字
* @param ossConfName
*/
public OSSAccess(String ossConfName,ResConfAccess resConfAccess) {
Oss dbOss = resConfAccess.getDbOss(ossConfName);
ossCli = new OssCli(dbOss.getEndPoint(), dbOss.getAccessKeyId(), dbOss.getAccessKeySecret());
maxKye = dbOss.getMaxKey();
}


/**
* 獲取 資料夾下每個檔案的大小

* @param bucketName
* @param prefix
* @param suffix
* @return
* @throws OSSException
* @throws ClientException
*/
public Map<String, Long> getOSSSizeByForder(String bucketName, String prefix, String suffix)
throws OSSException, ClientException {
Map<String, Long> resultMap = new HashMap<String, Long>();
ListObjectsRequest listObjectsRequest = new ListObjectsRequest(bucketName);
listObjectsRequest.setPrefix(prefix);
listObjectsRequest.setMaxKeys(maxKye);
ObjectListing listing = ossCli.listObjects(listObjectsRequest);
for (OSSObjectSummary objectSummary : listing.getObjectSummaries()) {
if (null != suffix && !"".equals(suffix)) {
if (objectSummary.getKey().endsWith(suffix)) {
resultMap.put(objectSummary.getKey(), objectSummary.getSize());
}
} else {
resultMap.put(objectSummary.getKey(), objectSummary.getSize());
}
}
return resultMap;
}


/**
* 刪除指定bucketName下的物件

* @param bucketName
* @param key
*/
public void deleteObj(String bucketName, String key) throws OSSException, ClientException {
ossCli.deleteObject(bucketName, key);
}


/**
* @param srcBucket
*            原檔案根目錄
* @param srcKey
*            原始檔全路徑
* @param destBucket
*            目標根目錄
* @param destKey
*            目標檔案全路徑
*/
public void copyFile(String srcBucket, String srcKey, String destBucket, String destKey) {
ossCli.copyObject(srcBucket, srcKey, destBucket, destKey);
}


/**
* 小檔案上傳工具,來自ObjectUtil類。原有一過載方法 putObject(OSSClient client, File
* sourceFile, String bucketName, String key, byte[]
* fileMD5),僅在modifiyObject方法中呼叫,修改modifiyObject呼叫處,過載方法刪除。

* @param client
* @param sourceFile
* @param bucketName
* @param key
* @param fileMD5
* @return
* @throws IOException
* @throws OSSException
* @throws ClientException
* @throws DecoderException
*/
public PutObjectResult putObject(File sourceFile, String bucketName, String key, String fileMD5)
throws IOException, OSSException, ClientException, DecoderException {
String md5 = null;
String md5Base64 = null;
if (StringUtils.isNotBlank(fileMD5)) {
md5 = fileMD5;
md5Base64 = Base64.encodeBase64String(Hex.decodeHex(md5.toCharArray()));
} else {
// 計算檔案md5
md5 = MD5Util.fileMD5(sourceFile.getAbsolutePath());
md5Base64 = Base64.encodeBase64String(Hex.decodeHex(md5.toCharArray()));
}


ObjectMetadata meta = new ObjectMetadata();
meta.setHeader("Content-MD5", md5Base64);
String fileName = sourceFile.getName();
Long fileSize = sourceFile.length();
meta.setContentLength(fileSize);
// UserMetadata
meta.addUserMetadata("md5", md5);
meta.addUserMetadata("md5base64", md5Base64);
meta.addUserMetadata("filename", fileName);
meta.addUserMetadata("filesize", fileSize.toString());
// 上傳檔案
try (InputStream content = new FileInputStream(sourceFile)) {
PutObjectResult result = ossCli.putObject(bucketName, key, content, meta);
return result;
}
}


/**
* put方式更新檔案 方法功能和putObject類似, 來自來自ObjectUtil類

* @param client
* @param sourceFile
* @param bucketName
* @param key
* @return
* @throws IOException
* @throws OSSException
* @throws ClientException
* @throws DecoderException
*/
public PutObjectResult modifiyObject(File sourceFile, String bucketName, String key)
throws IOException, OSSException, ClientException, DecoderException {


// 計算檔案md5值
String md5 = MD5Util.fileMD5(sourceFile.getAbsolutePath());


// 獲取 ObjectMetadata
ObjectMetadata objectMetadata = getMetadata(bucketName, key);


// 驗證檔案如果相同返回ObjectMetadata中PutObjectResult,否則重新上傳
if (null != objectMetadata && null != objectMetadata.getUserMetadata()
&& objectMetadata.getUserMetadata().containsKey("md5")) {
String ossMD5 = objectMetadata.getUserMetadata().get("md5");
if (ossMD5.equals(md5)) {// 檔案相同返回 PutObjectResult
PutObjectResult putObjectResult = new PutObjectResult();
putObjectResult.setETag(objectMetadata.getETag());
return putObjectResult;
}
}
return putObject(sourceFile, bucketName, key, md5);
}


public BufferedInputStream getOssStream(String bucketName, String key) throws IOException {
OSSObject ossObject = ossCli.getObject(bucketName, key);
BufferedInputStream bufferedInputStream = new BufferedInputStream(ossObject.getObjectContent());
return bufferedInputStream;
}


public ObjectMetadata getMetadata(String bucketName, String key) throws OSSException, ClientException, IOException {
ObjectMetadata objectMetadata = null;
try {
objectMetadata = ossCli.getObjectMetadata(bucketName, key);
} catch (OSSException e) {
if (!"NoSuchKey".equals(e.getErrorCode())) {
logger.error(e.getErrorMessage());
// throw e;
} else {
logger.error("NoSuchKey in the bucket");
// throw e;
}
}
return objectMetadata;
}


public void download(String bucketName, String ossFilePath, String targetPath)
throws OSSException, ClientException {
// OSSClient client = new OSSClient(OSSUtilHolder.endpoint,
// OSSUtilHolder.ossKey, OSSUtilHolder.ossSecret);
GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName, ossFilePath);
ossCli.getObject(getObjectRequest, new File(targetPath));
}


public List<String> getOSSSummaryList(String bucketName, String prefix, String suffix, Integer MaxKeys)
throws OSSException, ClientException {
List<String> list = new ArrayList<String>();
// OSSClient client = new OSSClient(OSSUtilHolder.endpoint,
// OSSUtilHolder.ossKey, OSSUtilHolder.ossSecret);
ListObjectsRequest listObjectsRequest = new ListObjectsRequest(bucketName);
listObjectsRequest.setPrefix(prefix);
listObjectsRequest.setMaxKeys(maxKye);
ObjectListing listing = ossCli.listObjects(listObjectsRequest);
for (OSSObjectSummary objectSummary : listing.getObjectSummaries()) {
if (null != suffix && !"".equals(suffix)) {
if (objectSummary.getKey().endsWith(suffix)) {
list.add(objectSummary.getKey());
}
} else {
list.add(objectSummary.getKey());
}
// 如果控制了返回個數
if (null != MaxKeys && MaxKeys.intValue() != 0) {
if (list.size() == MaxKeys.intValue()) {
break;
}
}
}
return list;
}

public void putObjectWithClient(String bucketName, String key, File file) {
ossCli.putObject(bucketName, key, file);
}


public void putObjectWithClient(String bucketName, String key, InputStream input) {
ossCli.putObject(bucketName, key, input);
}


@Override
public void close() throws IOException {
ossCli.close();
}

public Boolean keyOSSExists(String key, String bucketName){
ObjectMetadata dataMap=null;
try{
dataMap = ossCli.getObjectMetadata(bucketName, key);
}catch(OSSException e){
if (!"NoSuchKey".equals(e.getErrorCode())) {
                throw e;
            }
}catch(ClientException e){
throw e;
}
if (dataMap == null) {
return false;
}
return true;
}


}

package com.jianfeitech.bd.res.db.oss.access;


import java.io.Closeable;
import java.io.IOException;


import com.aliyun.oss.OSSClient;


public  class OssCli extends OSSClient implements Closeable{


public OssCli(String endpoint, String ossKey, String ossSecret) {
// TODO Auto-generated constructor stub
super(endpoint,ossKey,ossSecret);
}


@Override
public void close() throws IOException {
// TODO Auto-generated method stub
super.shutdown();
}

}

第二種

package com.jianfeitech.bd.res.db.oss.access;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.jianfeitech.bd.common.conf.access.ResConfAccess;
import com.jianfeitech.bd.common.conf.model.db.Oss;

/**
 * @author Liqiang
 * @date 建立時間:2017年2月17日 下午4:27:09
 * @version 1.0
 * @parameter
 * @since
 * @return
 */
public class EmrOssAccess {
private static final Logger logger = LoggerFactory.getLogger(EmrOssAccess.class);


// 資料檔案字尾
public static final String SUFFIX_DATA = ".tar.gz";
public static final String SUFFIX_ZIP = ".zip";


private static String accessKeyId;
private static String accessKeySecret;
private static String endPoint;


private static FileSystem fs;
private static Path path;


public EmrOssAccess(String ossConfName, ResConfAccess resConfAccess, String ossBucketName, String ossFolderPath) throws IOException {
// 獲取配置資訊
Oss dbOss = resConfAccess.getDbOss(ossConfName);
accessKeyId = dbOss.getAccessKeyId();
accessKeySecret = dbOss.getAccessKeySecret();
endPoint = dbOss.getEndPoint();
logger.info("accessKeyId is:" + accessKeyId + ",accessKeySecret is:" + accessKeySecret + ",endPoint is:" + endPoint);
// 初始化emr-oss配置
String ossIdAndSecret = "oss://" + accessKeyId + ":" + accessKeySecret;
String ossDirComplete = ossIdAndSecret + "@" + ossBucketName + endPoint.replace("http://", ".") + "/" + ossFolderPath;
logger.info("OssDirComplete Is :" + ossDirComplete);
path = new Path(ossDirComplete);
Configuration conf = new Configuration();
conf.set("fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem");
fs = FileSystem.get(path.toUri(), conf);
}


// 獲取oss檔名list
public List<String> getOssFileCompletePathList() throws IllegalArgumentException, IOException {
List<String> ossFilePathList = new ArrayList<>();


FileStatus[] fileList = fs.listStatus(path);


logger.info("All File Count Is: " + fileList.length);
for (int i = 0; i < fileList.length; i++) {
if (fileList[i].getPath().toString().indexOf(SUFFIX_DATA) == -1 && fileList[i].getPath().toString().indexOf(SUFFIX_ZIP) == -1) {
// 非.tar.gz和.zip的檔案不作處理
logger.info("Do Not Make Processing :" + fileList[i].getPath().toString());
continue;
}
ossFilePathList.add(fileList[i].getPath().toString());
}
return ossFilePathList;
}


// 下載一個oss檔案到本地,返回下載後文件在本地的全路徑名
public String download(String ossFilePath, String localStorePath) throws IllegalArgumentException, IOException {
File file = new File(localStorePath);
if (!file.exists()) {
file.mkdirs();
}
fs.copyToLocalFile(new Path(ossFilePath), new Path(localStorePath));
logger.info("Local Store Path Is: " + localStorePath);
String filePath = localStorePath + ossFilePath.substring(ossFilePath.lastIndexOf("/") + 1, ossFilePath.length());
logger.info("Download File Complete Path Is: " + filePath);
return filePath;
}


}