ceph儲存的S3接口實現(支持斷點續傳)
阿新 • • 發佈:2018-10-25
底層 讀取數據 put 準備 version throws sdk obj tle
最近公司準備接ceph儲存,研究了一番,準備用亞馬遜的s3接口實現,實現類如下:
/** * Title: S3Manager * Description: Ceph儲存的s3接口實現,參考文檔: * https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/dev/RetrievingObjectUsingJava.html * http://docs.ceph.org.cn/radosgw/s3/ * author: xu jun * date: 2018/10/22 */ @Slf4j @Service public classS3Manager extends StorageManagerBase implements StorageManager { private final UKID ukid; private final S3ClientConfig s3ClientConfig; private final RedisManage redisManage; private AmazonS3 amazonClient; @Autowired public S3Manager(UKID ukid, S3ClientConfig s3ClientConfig, RedisManage redisManage) {this.ukid = ukid; this.s3ClientConfig = s3ClientConfig; this.redisManage = redisManage; } private AmazonS3 getAmazonClient() { if (amazonClient == null) { String accessKey = s3ClientConfig.getAccessKey(); String secretKey = s3ClientConfig.getSecretKey(); String endpoint= s3ClientConfig.getEndPoint(); AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey); ClientConfiguration clientConfig = new ClientConfiguration(); clientConfig.setProtocol(Protocol.HTTP); AmazonS3 conn = AmazonS3ClientBuilder.standard() .withClientConfiguration(clientConfig) .withCredentials(new AWSStaticCredentialsProvider(credentials)) .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, "")) .withPathStyleAccessEnabled(true) .build(); //檢查儲存空間是否創建 checkBucket(conn); amazonClient = conn; } return amazonClient; } @Override public String uploadFile(byte[] fileData, String extension) { log.info("Storage s3 api, upload file start"); //生成上傳文件的隨機序號 long fileId = ukid.getGeneratorID(); String fileName = Long.toString(fileId); //儲存空間名 String bucketName = s3ClientConfig.getBucketName(); AmazonS3 conn = getAmazonClient(); PutObjectResult result = conn.putObject(bucketName, fileName, new ByteArrayInputStream(fileData), null); log.info("Storage s3 api, put object result :{}", result); log.info("Storage s3 api, upload file end, file name:" + fileName); return fileName; } @Override public String uploadAppenderFile(byte[] fileData, String extension) { log.info("Storage s3 api, upload appender file start"); //生成上傳文件的隨機序號 long ukId = ukid.getGeneratorID(); String fileName = Long.toString(ukId); //儲存空間名 String bucketName = s3ClientConfig.getBucketName(); AmazonS3 conn = getAmazonClient(); List<PartETag> partETags = new ArrayList<>(); //初始化分片上傳 InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, fileName); InitiateMultipartUploadResult initResponse = conn.initiateMultipartUpload(initRequest); String uploadId = initResponse.getUploadId(); ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(fileData); Integer contentLength = fileData.length; // 文件上傳 UploadPartRequest uploadPartRequest = new UploadPartRequest() .withBucketName(bucketName) .withKey(fileName) .withUploadId(uploadId) .withPartNumber(1) .withPartSize(contentLength) .withInputStream(byteArrayInputStream); UploadPartResult uploadPartResult = conn.uploadPart(uploadPartRequest); try { byteArrayInputStream.close(); } catch (IOException e) { throw FileCenterExceptionConstants.INTERNAL_IO_EXCEPTION; } partETags.add(uploadPartResult.getPartETag()); Integer partNumber = uploadPartResult.getPartNumber(); S3CacheMode cacheMode = new S3CacheMode(); cacheMode.setPartETags(partETags); cacheMode.setPartNumber(partNumber); cacheMode.setUploadId(uploadId); redisManage.set(fileName, cacheMode); log.info("Storage s3 api, upload appender file end, fileName: {}", fileName); return fileName; } @Override public void uploadChunkFile(ChunkFileSaveParams chunkFileSaveParams) { log.info("Storage s3 api, upload chunk file start"); String fileName = chunkFileSaveParams.getFileAddress(); Result result = redisManage.get(fileName); JSONObject jsonObject = (JSONObject) result.getData(); if (jsonObject == null) { throw FileCenterExceptionConstants.CACHE_DATA_NOT_EXIST; } S3CacheMode cacheMode = jsonObject.toJavaObject(S3CacheMode.class); Integer partNumber = cacheMode.partNumber; String uploadId = cacheMode.getUploadId(); List<PartETag> partETags = cacheMode.partETags; //儲存空間名 String bucketName = s3ClientConfig.getBucketName(); AmazonS3 conn = getAmazonClient(); ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(chunkFileSaveParams.getBytes()); Integer contentLength = chunkFileSaveParams.getBytes().length; UploadPartRequest uploadPartRequest = new UploadPartRequest() .withBucketName(bucketName) .withKey(fileName) .withUploadId(uploadId) .withPartNumber(partNumber + 1) .withPartSize(contentLength) .withInputStream(byteArrayInputStream); UploadPartResult uploadPartResult = conn.uploadPart(uploadPartRequest); partETags.add(uploadPartResult.getPartETag()); partNumber = uploadPartResult.getPartNumber(); try { byteArrayInputStream.close(); } catch (IOException e) { throw FileCenterExceptionConstants.INTERNAL_IO_EXCEPTION; } S3CacheMode cacheModeUpdate = new S3CacheMode(); cacheModeUpdate.setPartETags(partETags); cacheModeUpdate.setPartNumber(partNumber); cacheModeUpdate.setUploadId(uploadId); redisManage.set(fileName, cacheModeUpdate); if (chunkFileSaveParams.getChunk().equals(chunkFileSaveParams.getChunks() - 1)) { //完成分片上傳,生成儲存對象 CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(bucketName, fileName, uploadId, partETags); conn.completeMultipartUpload(compRequest); } log.info("Storage s3 api, upload chunk file end"); } @Override public byte[] downloadFile(String fileName) { log.info("Storage s3 api, download file start"); //儲存空間名 String bucketName = s3ClientConfig.getBucketName(); AmazonS3 conn = getAmazonClient(); S3Object object; if (conn.doesObjectExist(bucketName, fileName)) { object = conn.getObject(bucketName, fileName); } else { throw FileCenterExceptionConstants.OBJECT_NOT_EXIST; } log.debug("Storage s3 api, get object result :{}", object); byte[] fileByte; InputStream inputStream; inputStream = object.getObjectContent(); try { fileByte = IOUtils.toByteArray(inputStream); inputStream.close(); } catch (IOException e) { throw FileCenterExceptionConstants.INTERNAL_IO_EXCEPTION; } finally { if (inputStream != null) { try { inputStream.close(); } catch (IOException e) { log.error(e.getMessage()); } } } log.info("Storage s3 api, download file end"); return fileByte; } @Override public byte[] downloadFile(String fileName, long fileOffset, long fileSize) { log.info("Storage s3 api, download file by block start"); //儲存空間名 String bucketName = s3ClientConfig.getBucketName(); AmazonS3 conn = getAmazonClient(); S3Object object; if (conn.doesObjectExist(bucketName, fileName)) { GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName, fileName) .withRange(fileOffset, fileOffset + fileSize); //範圍下載。 object = conn.getObject(getObjectRequest); } else { throw FileCenterExceptionConstants.OBJECT_NOT_EXIST; } log.info("Storage s3 api, get object result :{}", object); // 讀取數據。 byte[] buf; InputStream in = object.getObjectContent(); try { buf = inputToByte(in, (int) fileSize); } catch (IOException e) { throw FileCenterExceptionConstants.INTERNAL_IO_EXCEPTION; } finally { try { in.close(); } catch (IOException e) { log.error(e.getMessage()); } } log.info("Storage s3 api, download file by block end"); return buf; } @Override public String fileSecret(String filePath) { return null; } @Override public String fileDecrypt(String filePath) { return null; } @Override public String getDomain() { return null; } /** * 檢查儲存空間是否已創建 * * @param conn 客戶端連接 */ private void checkBucket(AmazonS3 conn) { //儲存空間名 String bucketName = s3ClientConfig.getBucketName(); if (conn.doesBucketExist(bucketName)) { log.debug("Storage s3 api, bucketName is found: " + bucketName); } else { log.warn("Storage s3 api, bucketName is not exist, create it: " + bucketName); conn.createBucket(bucketName); } } /** * inputStream轉byte[] * * @param inStream 輸入 * @param fileSize 文件大小 * @return 輸出 * @throws IOException 異常 */ private static byte[] inputToByte(InputStream inStream, int fileSize) throws IOException { ByteArrayOutputStream swapStream = new ByteArrayOutputStream(); byte[] buff = new byte[fileSize]; int rc; while ((rc = inStream.read(buff, 0, fileSize)) > 0) { swapStream.write(buff, 0, rc); } return swapStream.toByteArray(); } /** * 調試用的方法,可以在控制臺看到io的數據 * * @param input 輸入 * @throws IOException 異常 private static void displayTextInputStream(InputStream input) throws IOException { // Read the text input stream one line at a time and display each line. BufferedReader reader = new BufferedReader(new InputStreamReader(input)); String line; while ((line = reader.readLine()) != null) { log.info(line); } } */ }
業務接口要實現包括分片上傳(支持斷點續傳)、分片下載等功能,上面類是底層類不包含業務邏輯。
maven依賴:
<!-- ceph儲存的接口 --> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk</artifactId> <version>1.11.433</version> </dependency>
開發感受:
1.ceph官網上提供的s3接口文檔(java版),內容又少又舊,已經基本不能當做參考了。所以API和代碼示例要去亞馬遜官網上看(提供了中文版,好評)
2.s3接口本身不提供文件追加儲存的功能。所以在實現分片上傳的時候,比較麻煩(不想fastDFS和OSS那麽方便)
3.分片上傳默認最小限制是5M,要修改可以在服務器配置上做
4.如果使用域名做端點的話,默認會把bucket的名字,作為子域名來訪問(需要域名解析,所以不建議)。如果想作為路徑來訪問,需要在連接配置中指定。
ceph儲存的S3接口實現(支持斷點續傳)