基於commons-pool2實現FastDFS連線池+心跳檢測
阿新 • • 發佈:2021-08-19
yml 配置檔案
#fastdfs相關配置 fastdfs: connectTimeoutInSeconds: 30 networkTimeoutInSeconds: 60 charset: UTF-8 httpAntiStealToken: no httpSecretKey: FastDFS1234567890 httpTrackerHttpPort: 8888 trackerServers: 82.157.0.217:22122 maxStorageConnection: 10 #最大連線數 defaultPoolSize: 5 #預設連線數 connectionRetrySize:5 #連線重試次數
TrackerServer客戶端工廠
1 package cn.com.fileparse.client.fdfs; 5 import lombok.extern.slf4j.Slf4j; 6 import org.apache.commons.lang.exception.ExceptionUtils; 7 import org.apache.commons.pool2.BasePooledObjectFactory; 8 import org.apache.commons.pool2.PooledObject; 9 import org.apache.commons.pool2.impl.DefaultPooledObject;10 import org.csource.fastdfs.ClientGlobal; 11 import org.csource.fastdfs.ProtoCommon; 12 import org.csource.fastdfs.TrackerClient; 13 import org.csource.fastdfs.TrackerServer; 14 15 import java.io.IOException; 16 import java.util.Properties; 17 18 /** 19 *@program 20 *@description TrackerServer客戶端工廠21 *@author jiaqiangx 22 *@create 2021-08-13 11:20 23 */ 24 @Slf4j 25 public class TrackerServerFactory extends BasePooledObjectFactory { 26 private FastDfsProperties fastDfsProperties; 27 28 public TrackerServerFactory(FastDfsProperties fastDfsProperties) { 29 this.fastDfsProperties = fastDfsProperties; 30 } 31 private static Properties properties = new Properties(); 32 33 @Override 34 public TrackerServer create() throws FileparseException { 35 try { 36 initClientGlobal(); 37 } catch (Exception e) { 38 log.warn("FastDFS initClientGlobal failed [{}]", ExceptionUtils.getFullStackTrace(e)); 39 return null; 40 } 41 // TrackerClient 42 TrackerClient trackerClient = new TrackerClient(); 43 // TrackerServer 44 TrackerServer trackerServer = null; 45 int flag =0; 46 try { 47 trackerServer = trackerClient.getConnection(); 48 } catch (IOException e) { 49 log.warn("FastDFS connect failed "); 50 return null; 51 } 52 try { 53 while (trackerServer == null && flag < fastDfsProperties.getConnectionRetrySize()) { 54 log.info("[建立TrackerServer(createTrackerServer)][第{}次重建]", flag); 55 flag++; 56 initClientGlobal(); 57 trackerServer = trackerClient.getConnection(); 58 } 59 if(ProtoCommon.activeTest(trackerServer.getSocket())){ 60 return trackerServer; 61 } 62 63 } catch (Exception e) { 64 log.error("[建立TrackerServer(createTrackerServer)][異常:{}]", ExceptionUtils.getFullStackTrace(e)); 65 66 } finally { 67 if (trackerServer != null) { 68 try { 69 trackerServer.close(); 70 } catch (Exception e) { 71 log.error("[建立TrackerServer(createTrackerServer)--關閉trackerServer異常][異常:{}]", ExceptionUtils.getFullStackTrace(e)); 72 } 73 } 74 } 75 return trackerServer; 76 77 } 78 79 @Override 80 public PooledObject wrap(TrackerServer obj) { 81 return new DefaultPooledObject<>(obj); 82 } 83 84 /** 85 * 銷燬TrackerServer物件 86 */ 87 @Override 88 public void destroyObject(PooledObject ftpPooled) { 89 90 if (ftpPooled != null) { 91 TrackerServer trackerServer = ftpPooled.getObject(); 92 try { 93 trackerServer.close(); 94 } catch (IOException e) { 95 log.error("FastDFS client logout failed...{}", ExceptionUtils.getFullStackTrace(e)); 96 } 97 } 98 99 } 100 101 /** 102 * 驗證TrackerServer物件是否還可用 103 */ 104 @Override 105 public boolean validateObject(PooledObject pooledObject) { 106 try { 107 return ProtoCommon.activeTest(pooledObject.getObject().getSocket()); 108 } catch (IOException e) { 109 log.error("Failed to validate client: {}", ExceptionUtils.getFullStackTrace(e)); 110 } 111 return false; 112 } 113 114 private void initClientGlobal() throws Exception { 115 properties.put(ClientGlobal.PROP_KEY_CONNECT_TIMEOUT_IN_SECONDS, fastDfsProperties.getConnectTimeoutInSeconds()); 116 properties.put(ClientGlobal.PROP_KEY_NETWORK_TIMEOUT_IN_SECONDS, fastDfsProperties.getNetworkTimeoutInSeconds()); 117 properties.put(ClientGlobal.PROP_KEY_CHARSET, fastDfsProperties.getCharset()); 118 properties.put(ClientGlobal.PROP_KEY_HTTP_TRACKER_HTTP_PORT, fastDfsProperties.getHttpTrackerHttpPort()); 119 properties.put(ClientGlobal.PROP_KEY_HTTP_ANTI_STEAL_TOKEN, fastDfsProperties.getHttpAntiStealToken()); 120 properties.put(ClientGlobal.PROP_KEY_HTTP_SECRET_KEY, fastDfsProperties.getHttpSecretKey()); 121 properties.put(ClientGlobal.PROP_KEY_TRACKER_SERVERS, fastDfsProperties.getTrackerServers()); 122 ClientGlobal.initByProperties(properties); 123 } 124 125 }
FastDfsProperties 讀取yml配置
1 package cn.com.fileparse.properties; 2 3 import lombok.Data; 4 import org.springframework.boot.context.properties.ConfigurationProperties; 5 6 /** 7 * @author jiaqiangx 8 * @program ebs-file-service 9 * @description 10 * @create 2021-07-09 09:22 11 */ 12 @Data 13 @ConfigurationProperties(prefix = FastDfsProperties.PREFIX, ignoreUnknownFields = false) 14 public class FastDfsProperties { 15 public static final String PREFIX = "fastdfs"; 16 17 private String connectTimeoutInSeconds; 18 private String networkTimeoutInSeconds; 19 private String charset; 20 private String httpAntiStealToken; 21 private String httpSecretKey; 22 private String httpTrackerHttpPort; 23 private String trackerServers; 24 private Integer maxStorageConnection; 25 private Integer connectionRetrySize; 26 private Integer defaultPoolSize; 27 28 }
FastDFS 連線池
1 package cn.com.fileparse.client.fdfs; 2 3 import cn.com.fileparse.constant.ResponseCode; 4 import cn.com.fileparse.exception.FileparseException; 5 import cn.com.fileparse.util.YmlUtils; 6 import lombok.extern.slf4j.Slf4j; 7 import org.apache.commons.lang.exception.ExceptionUtils; 8 import org.apache.commons.pool2.BaseObjectPool; 9 import org.csource.fastdfs.TrackerServer; 10 import org.springframework.util.ObjectUtils; 11 12 import java.util.concurrent.ArrayBlockingQueue; 13 import java.util.concurrent.BlockingQueue; 14 import java.util.concurrent.ExecutorService; 15 import java.util.concurrent.Executors; 16 import java.util.concurrent.TimeUnit; 17 18 /** 19 *@program 20 *@description FastDFS client 池 21 *@author jiaqiangx 22 *@create 2021-08-13 11:20 23 */ 24 @Slf4j 25 public class TrackerServerPool extends BaseObjectPool { 26 private final BlockingQueue dfsBlockingQueue; 27 private final TrackerServerFactory trackerServerFactory; 28 private static Integer poolSize = Integer.valueOf(YmlUtils.getValue("fastdfs.defaultPoolSize")); 29 private static Integer poolMaxSize = Integer.valueOf(YmlUtils.getValue("fastdfs.maxStorageConnection")); 30 /** 31 * 初始化連線池,需要注入一個工廠來提供TrackerServer例項 32 * 33 * @param trackerServerFactory trackerServe工廠 34 * @throws Exception 35 */ 36 public TrackerServerPool(TrackerServerFactory trackerServerFactory) throws Exception { 37 this.trackerServerFactory = trackerServerFactory; 38 dfsBlockingQueue = new ArrayBlockingQueue<>(poolSize); 39 initPool(poolSize); 40 } 41 42 43 /** 44 * 初始化連線池,需要注入一個工廠來提供TrackerServer例項 45 * 46 * @param maxPoolSize 最大連線數 47 * @throws Exception 48 */ 49 50 private void initPool(int maxPoolSize) throws Exception { 51 ExecutorService executorService = Executors.newFixedThreadPool(1); 52 executorService.submit(() -> { 53 log.info("-----Init TrackerServer Connect START------"); 54 for (int i = 0; i < maxPoolSize; i++) { 55 // 往池中新增物件 56 try { 57 addObject(); 58 }catch (Exception e){ 59 log.warn("初始化連線池失敗"); 60 } 61 } 62 }); 63 } 64 65 /** 66 * 獲取連線 67 * 68 * @return 69 * @throws Exception 70 */ 71 @Override 72 public TrackerServer borrowObject() throws FileparseException { 73 TrackerServer client = null; 74 try { 75 //client = dfsBlockingQueue.take(); 76 //從佇列中獲取值 預設30S超時 77 client = dfsBlockingQueue.poll(30,TimeUnit.SECONDS); 78 } catch (InterruptedException e) { 79 log.info("queue獲取任務異常,異常原因:{}", ExceptionUtils.getFullStackTrace(e)); 80 } 81 82 if (ObjectUtils.isEmpty(client)) { 83 if (dfsBlockingQueue.size() < poolMaxSize) { 84 client = trackerServerFactory.create(); 85 // 放入連線池 86 returnObject(client); 87 }else { 88 //連線數已滿 89 throw new FileparseException(ResponseCode.CONNECT_OVERFLOW); 90 } 91 // 驗證物件是否有效 這裡通過實踐驗證 如果長時間不校驗是否存活,則這裡會報通道已斷開等錯誤 92 } else if (!trackerServerFactory.validateObject(trackerServerFactory.wrap(client))) { 93 // 對無效的物件進行處理 94 invalidateObject(client); 95 // 建立新的物件 96 client = trackerServerFactory.create(); 97 // 將新的物件放入連線池 98 returnObject(client); 99 } 100 return client; 101 } 102 103 @Override 104 public void returnObject(TrackerServer client) throws FileparseException{ 105 try { 106 if (client != null && !dfsBlockingQueue.offer(client, 10, TimeUnit.SECONDS)) { 107 trackerServerFactory.destroyObject(trackerServerFactory.wrap(client)); 108 //連線池已滿 丟擲指定異常 109 throw new FileparseException(ResponseCode.CONNECT_OVERFLOW); 110 } 111 } catch (InterruptedException e) { 112 log.error("return TrackerServer client interrupted ...{}", ExceptionUtils.getFullStackTrace(e)); 113 } 114 } 115 116 @Override 117 public void invalidateObject(TrackerServer client) { 118 trackerServerFactory.destroyObject(trackerServerFactory.wrap(client)); 119 dfsBlockingQueue.remove(client); 120 } 121 122 /** 123 * 增加一個新的連結,超時失效 124 */ 125 @Override 126 public void addObject() throws Exception { 127 // 插入物件到佇列 128 dfsBlockingQueue.offer(trackerServerFactory.create(), 10, TimeUnit.SECONDS); 129 } 130 131 /** 132 * @param trackerServer 需釋放的連線物件 133 * @Description: 釋放繁忙連線 1.如果空閒池的連線小於最小連線值,就把當前連線放入idleConnectionPool; 134 * 2.如果空閒池的連線等於或大於最小連線值,就把當前釋放連線丟棄; 135 */ 136 public void checkin(TrackerServer trackerServer) { 137 //log.info("[釋放當前連線(checkin)][prams:{}}] ", trackerServer); 138 if (trackerServer != null) { 139 if (dfsBlockingQueue.size() < poolSize) { 140 dfsBlockingQueue.add(trackerServer); 141 } else { 142 trackerServerFactory.destroyObject(trackerServerFactory.wrap(trackerServer)); 143 } 144 } 145 146 } 147 148 149 /** 150 * 關閉連線池 151 */ 152 @Override 153 public void close() { 154 try { 155 while (dfsBlockingQueue.iterator().hasNext()) { 156 TrackerServer client = dfsBlockingQueue.take(); 157 trackerServerFactory.destroyObject(trackerServerFactory.wrap(client)); 158 } 159 } catch (Exception e) { 160 log.error("close TrackerServer client dfsBlockingQueue failed...{}", ExceptionUtils.getFullStackTrace(e)); 161 } 162 } 163 164 public BlockingQueue getDfsBlockingQueue() { 165 return dfsBlockingQueue; 166 } 167 168 }
心跳檢測
1 package cn.com.fileparse.client.fdfs; 2 3 import lombok.extern.slf4j.Slf4j; 4 import org.apache.commons.lang.exception.ExceptionUtils; 5 import org.csource.fastdfs.ProtoCommon; 6 import org.csource.fastdfs.TrackerServer; 7 import org.springframework.beans.factory.annotation.Autowired; 8 9 import javax.annotation.PostConstruct; 10 import java.util.Iterator; 11 import java.util.concurrent.BlockingQueue; 12 13 /** 14 *@program 15 *@description 檢測TrackerServer客戶端是否在活著 16 *@author jiaqiangx 17 *@create 2021-06-29 22:42 18 */ 19 @Slf4j 20 public class TrackerServerKeepAlive { 21 22 private KeepAliveThread keepAliveThread; 23 24 @Autowired 25 private TrackerServerPool trackerServerPool; 26 27 private final String THREAD_NAME = "tracker-client-alive-thread"; 28 29 /** 30 * 等待時間 31 */ 32 public static int waitTimes = 1200; 33 34 35 @PostConstruct 36 public void init() { 37 // 啟動心跳檢測執行緒 38 if (keepAliveThread == null) { 39 keepAliveThread = new KeepAliveThread(); 40 Thread thread = new Thread(keepAliveThread, THREAD_NAME); 41 thread.start(); 42 } 43 } 44 45 class KeepAliveThread implements Runnable { 46 @Override 47 public void run() { 48 TrackerServer ts = null; 49 while (true) { 50 try { 51 BlockingQueue pool = trackerServerPool.getDfsBlockingQueue(); 52 if (pool != null && pool.size() > 0) { 53 Iterator it = pool.iterator(); 54 while (it.hasNext()) { 55 ts = it.next(); 56 if (ts != null) { 57 boolean result = ProtoCommon.activeTest(ts.getSocket()); 58 log.info("trackerServer心跳檢測結果:{} ", result); 59 if(!result){ 60 //清除失效連線 61 trackerServerPool.invalidateObject(ts); 62 }else { 63 //回收空閒連線 64 trackerServerPool.checkin(ts); 65 } 66 } else { 67 /** 代表已經沒有空閒長連線 */ 68 break; 69 } 70 } 71 } 72 } catch (Exception e) { 73 log.error("trackerServer心跳檢測異常{}", ExceptionUtils.getFullStackTrace(e)); 74 trackerServerPool.invalidateObject(ts); 75 } 76 // 每30s傳送一次心跳 77 try { 78 Thread.sleep(1000 * 30); 79 } catch (InterruptedException e) { 80 log.error("trackerServer休眠異常{}", ExceptionUtils.getFullStackTrace(e)); 81 } 82 } 83 84 } 85 } 86 }
啟動初始化服務
1 package cn.com.fileparse.config; 2 3 import cn.com.fileparse.client.fdfs.TrackerServerKeepAlive; 4 import cn.com.fileparse.client.fdfs.TrackerServerFactory; 5 import cn.com.fileparse.client.fdfs.TrackerServerPool; 6 import cn.com.fileparse.properties.FastDfsProperties; 7 import lombok.extern.slf4j.Slf4j; 8 import org.springframework.beans.factory.annotation.Autowired; 9 import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; 10 import org.springframework.boot.context.properties.EnableConfigurationProperties; 11 import org.springframework.context.annotation.Bean; 12 13 /** 14 *@program 15 *@description 16 *@author jiaqiangx 17 *@create 2021-06-30 21:52 18 */ 19 @Slf4j 20 @EnableConfigurationProperties(FastDfsProperties.class) 21 public class FastDFSConfig { 22 @Autowired 23 FastDfsProperties fastDfsProperties; 24 /** 25 * 客戶端工廠 26 * 27 * @return 28 */ 29 @Bean 30 public TrackerServerFactory trackerServerFactory() { 31 return new TrackerServerFactory(fastDfsProperties); 32 } 33 34 /** 35 * 連線池 36 * 37 * @param trackerServerFactory 38 * @return 39 * @throws Exception 40 */ 41 @Bean 42 public TrackerServerPool TrackerServerClientPool(TrackerServerFactory trackerServerFactory) throws Exception { 43 return new TrackerServerPool(trackerServerFactory); 44 } 45 46 /** 47 * TrackerServer心跳檢測 48 */ 49 @Bean 50 @ConditionalOnBean(TrackerServerPool.class) 51 public TrackerServerKeepAlive trackerServerKeepAlive() { 52 return new TrackerServerKeepAlive(); 53 } 54 55 }
註解配置
1 package cn.com.fileparse.annotation; 2 import cn.com.fileparse.config.FastDFSConfig; 3 import org.springframework.context.annotation.Import; 4 5 import java.lang.annotation.ElementType; 6 import java.lang.annotation.Retention; 7 import java.lang.annotation.RetentionPolicy; 8 import java.lang.annotation.Target; 9 10 /** 11 *@program 12 *@description 啟用FastDFS自動配置 13 *@author jiaqiangx 14 *@create 2021-08-13 14:17 15 */ 16 @Target(ElementType.TYPE) 17 @Retention(RetentionPolicy.RUNTIME) 18 @Import(FastDFSConfig.class) 19 public @interface EnableFastDFS { 20 }
檔案上傳下載主類
1 package cn.com.fileparse.client.fdfs; 2 3 import cn.com.fileparse.annotation.EnableFastDFS; 4 import cn.com.fileparse.client.ftp.FtpClientPool; 5 import cn.com.fileparse.constant.ResponseCode; 6 import cn.com.fileparse.exception.FileparseException; 7 import cn.com.fileparse.util.common.SpringBeanUtil; 8 import lombok.extern.slf4j.Slf4j; 9 import org.apache.commons.io.FilenameUtils; 10 import org.apache.commons.lang.exception.ExceptionUtils; 11 import org.apache.commons.lang3.StringUtils; 12 import org.csource.common.MyException; 13 import org.csource.common.NameValuePair; 14 import org.csource.fastdfs.DownloadStream; 15 import org.csource.fastdfs.FileInfo; 16 import org.csource.fastdfs.StorageClient; 17 import org.csource.fastdfs.StorageClient1; 18 import org.csource.fastdfs.TrackerServer; 19 import org.csource.fastdfs.UploadCallback; 20 import org.csource.fastdfs.UploadStream; 21 import org.springframework.stereotype.Component; 22 23 import java.io.File; 24 import java.io.FileInputStream; 25 import java.io.FileNotFoundException; 26 import java.io.FileOutputStream; 27 import java.io.IOException; 28 import java.io.InputStream; 29 import java.io.OutputStream; 30 31 /** 32 * FastDFS Java API. 檔案上傳下載主類. 33 *
34 */ 35 @Slf4j 36 @EnableFastDFS 37 @Component 38 public class FastDFSClient { 39 40 public static TrackerServer getStorageClient() throws FileparseException{ 41 TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class); 42 TrackerServer trackerServer = null; 43 try { 44 trackerServer = trackerServerPool.borrowObject(); 45 } catch (FileparseException e) { 46 throw new FileparseException(ResponseCode.GET_DFS_CONNECT_ERROR); 47 } 48 49 return trackerServer; 50 } 51 52 public static String upload(File file) throws FileparseException { 53 return upload(file, null); 54 } 55 56 public static String upload(File file, String groupName) throws FileparseException { 57 String filePath; 58 FileInputStream in = null; 59 try { 60 in = new FileInputStream(file); 61 String fileExtName = FilenameUtils.getExtension(file.getName()); 62 // 設定檔案元資訊 63 NameValuePair[] metaList = new NameValuePair[3]; 64 metaList[0] = new NameValuePair("fileExtName", fileExtName); 65 metaList[1] = new NameValuePair("fileLength", String.valueOf(file.length())); 66 metaList[2] = new NameValuePair("fileName", getFileNameString(file.getName())); 67 filePath = upload(null, in.getChannel().size(), new UploadStream(in, file.length()), fileExtName, metaList); 68 } catch (Exception e) { 69 log.error("上傳檔案到fastdfs失敗!檔名:{}", file.getName()); 70 throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR); 71 } finally { 72 try { 73 if (in != null) { 74 in.close(); 75 } 76 } catch (IOException e) { 77 log.error("close FileInputStream failed...{}", ExceptionUtils.getFullStackTrace(e)); 78 } 79 } 80 return filePath; 81 82 } 83 84 public static String upload(InputStream in, String fileName) throws FileparseException { 85 String filePath; 86 try { 87 String fileExtName = FilenameUtils.getExtension(fileName); 88 // 設定檔案元資訊 89 NameValuePair[] metaList = new NameValuePair[3]; 90 metaList[0] = new NameValuePair("fileExtName", fileExtName); 91 metaList[1] = new NameValuePair("fileLength", String.valueOf(in.available())); 92 metaList[2] = new NameValuePair("fileName", getFileNameString(fileName)); 93 filePath = upload(null, in.available(), new UploadStream(in, in.available()), fileExtName, metaList); 94 } catch (Exception e) { 95 log.error("上傳檔案到fastdfs失敗!檔名:{}", fileName); 96 throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR); 97 } finally { 98 try { 99 if (in != null) { 100 in.close(); 101 } 102 } catch (IOException e) { 103 log.error("close FileInputStream failed...{}", ExceptionUtils.getFullStackTrace(e)); 104 } 105 } 106 return filePath; 107 108 } 109 110 /** 111 * 上傳通用方法 112 */ 113 public static String upload(String groupName, long file_size, UploadCallback callback, String file_ext_name, NameValuePair[] meta_list) throws FileparseException { 114 TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class); 115 TrackerServer trackerServer = trackerServerPool.borrowObject(); 116 StorageClient1 storageClient = new StorageClient1(trackerServer, null); 117 String path = null; 118 try { 119 // 上傳 120 path = storageClient.upload_file1(groupName, file_size, callback, file_ext_name, meta_list); 121 if (StringUtils.isBlank(path)) { 122 throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR); 123 } 124 log.info("上傳檔案到fastdfs成功!檔案地址:{}", path); 125 } catch (IOException e) { 126 log.error("上傳檔案到fastdfs失敗!失敗原因:{}", ExceptionUtils.getFullStackTrace(e)); 127 throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR); 128 } catch (MyException e) { 129 log.error("上傳檔案到fastdfs失敗!失敗原因:{}", ExceptionUtils.getFullStackTrace(e)); 130 throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR); 131 } finally { 132 // 返還物件 133 trackerServerPool.checkin(trackerServer); 134 } 135 return path; 136 } 137 138 139 public static void download(String filePathName, File file) throws FileNotFoundException, FileparseException { 140 if (!file.exists()) { 141 File parentFile = file.getParentFile(); 142 if (!parentFile.exists()) { 143 parentFile.mkdirs(); 144 } 145 } 146 FileOutputStream in = null; 147 try { 148 in = new FileOutputStream(file); 149 download(filePathName, in); 150 } catch (Exception e) { 151 log.error("從fastdfs下載檔案失敗!失敗原因:{}", ExceptionUtils.getFullStackTrace(e)); 152 throw new FileparseException(ResponseCode.DOWN_FILE_ERROR_1); 153 } 154 } 155 public static void download(String filePathName, OutputStream output) throws FileparseException { 156 TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class); 157 TrackerServer trackerServer = trackerServerPool.borrowObject(); 158 StorageClient1 storageClient = new StorageClient1(trackerServer, null); 159 try { 160 storageClient.download_file1(filePathName, new DownloadStream(output)); 161 log.info("從fastdfs下載檔案成功!檔名稱:{}", filePathName); 162 } catch (Exception e) { 163 log.error("從fastdfs下載檔案失敗,檔案路徑:{}", filePathName); 164 throw new FileparseException(ResponseCode.DOWN_FILE_ERROR.getDesc()+e.getMessage(), ResponseCode.DOWN_FILE_ERROR.getCode()); 165 }finally { 166 // 返還物件 167 trackerServerPool.checkin(trackerServer); 168 } 169 } 170 171 /** 172 * @Description: 根據檔案路徑刪除儲存伺服器上檔案 173 * @Param: [filePath] SysDocFile.getFilePath() 174 * @return: boolean 175 * @Author: wanfy 176 * @Date: 2019/4/23 10:08 AM 177 */ 178 public static boolean deleteFile(String filePath) throws FileparseException{ 179 boolean success = false; 180 TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class); 181 TrackerServer trackerServer = trackerServerPool.borrowObject(); 182 StorageClient1 storageClient = new StorageClient1(trackerServer, null); 183 try { 184 String group_name = filePath.split("/")[0]; 185 String remote_filename = filePath.substring(group_name.length() + 1); 186 int i = storageClient.delete_file(group_name, remote_filename); 187 if (i == 0) { 188 log.info("已從fastdfs上刪除檔案"); 189 success = true; 190 } 191 } catch (Exception e) { 192 log.warn("從fastdfs上刪除檔案失敗"); 193 throw new FileparseException(ResponseCode.DEF_ERROR_CODE); 194 } finally { 195 // 返還物件 196 trackerServerPool.checkin(trackerServer); 197 } 198 return success; 199 } 200 201 /** 202 * @Description: 獲取檔案大小 203 * @Param: [filePath] SysDocFile.getFilePath() 204 * @return: boolean 205 * @Author: wanfy 206 * @Date: 2019/4/23 10:08 AM 207 */ 208 public static long getFileSize(String filePath) throws FileparseException{ 209 TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class); 210 TrackerServer trackerServer = trackerServerPool.borrowObject(); 211 StorageClient1 storageClient = new StorageClient1(trackerServer, null); 212 try { 213 String group_name = filePath.split("/")[0]; 214 String remote_filename = filePath.substring(group_name.length() + 1); 215 FileInfo fileInfo = storageClient.get_file_info(group_name, remote_filename); 216 if (fileInfo == null) { 217 log.warn("fastdfs查詢檔案為空"); 218 throw new FileparseException(ResponseCode.DOWN_FILE_IS_NULL); 219 } 220 return fileInfo.getFileSize(); 221 } catch (Exception e) { 222 log.warn("從fastdfs上獲取檔案大小失敗"); 223 throw new FileparseException(ResponseCode.DOWN_FILE_ERROR.getDesc()+e.getMessage(), ResponseCode.DOWN_FILE_ERROR.getCode()); 224 } finally { 225 // 返還物件 226 trackerServerPool.checkin(trackerServer); 227 } 228 } 229 230 private static String getFileNameString(String fileName) { 231 String str1; 232 if (fileName.indexOf(".") != -1) { 233 str1 = fileName.substring(0, fileName.lastIndexOf(".")); 234 } else { 235 str1 = fileName; 236 } 237 return str1; 238 } 239 }
簡單寫幾個工具類
1 /** 2 * 3 */ 4 package cn.com.fileparse.util.old; 5 6 import cn.com.fileparse.client.fdfs.FastDFSClient; 7 import cn.com.fileparse.exception.FileparseException; 8 import lombok.extern.slf4j.Slf4j; 9 import org.apache.commons.io.FilenameUtils; 10 import org.csource.common.NameValuePair; 11 import org.csource.fastdfs.StorageClient1; 12 import org.csource.fastdfs.StorageServer; 13 import org.csource.fastdfs.UploadStream; 14 import org.springframework.stereotype.Component; 15 16 import java.io.File; 17 import java.io.FileInputStream; 18 import java.io.FileNotFoundException; 19 import java.io.FileOutputStream; 20 import java.io.IOException; 21 import java.io.InputStream; 22 import java.io.OutputStream; 23 24 /** 25 * @author jiaqiangx 26 */ 27 @Component 28 @Slf4j 29 public class FastDFSUtil { 30 31 public static String upload(File file) throws FileparseException{ 32 return FastDFSClient.upload(file, null); 33 } 34 35 public static String upload(InputStream in, String fileName) throws FileparseException{ 36 return FastDFSClient.upload(in, fileName); 37 } 38 39 /** 40 * @Description:上傳多個檔案到本機的storage伺服器 (供CA服務專用) 41 * @Param: [file] 42 * @return: java.lang.String 43 * @Author: wanfy 44 * @Date: 2019/5/21 3:44 PM 45 */ 46 public static String uploadFileToLocalhostStorage(File file) { 47 String filePath; 48 StorageServer storageServer = null; 49 try (FileInputStream in = new FileInputStream(file)) { 50 String fileExtName = FilenameUtils.getExtension(file.getName()); 51 storageServer = new StorageServer("localhost", 23000, 0); 52 StorageClient1 client = new StorageClient1(null, storageServer); 53 // 設定檔案元資訊 54 NameValuePair[] metaList = new NameValuePair[3]; 55 metaList[0] = new NameValuePair("fileExtName", fileExtName); 56 metaList[1] = new NameValuePair("fileLength", String.valueOf(file.length())); 57 metaList[2] = new NameValuePair("fileName", getFileNameString(file.getName())); 58 filePath = client.upload_file1(null, in.getChannel().size(), new UploadStream(in, file.length()), fileExtName, metaList); 59 log.info("上傳檔案到fastdfs成功!"); 60 } catch (Exception e) { 61 log.error("上傳檔案到fastdfs失敗!檔名:{}", file.getName()); 62 throw new RuntimeException(e); 63 } finally { 64 try { 65 if (storageServer != null) { 66 storageServer.close(); 67 } 68 } catch (IOException e) { 69 log.warn("storageServer關閉失敗"); 70 } 71 } 72 return filePath; 73 } 74 75 public static void download(String filePathName, File file) throws FileNotFoundException,FileparseException { 76 if (!file.exists()) { 77 File parentFile = file.getParentFile(); 78 if (!parentFile.exists()) { 79 parentFile.mkdirs(); 80 } 81 } 82 download(filePathName, new FileOutputStream(file)); 83 } 84 85 public static void download(String filePathName, OutputStream output) throws FileparseException { 86 FastDFSClient.download(filePathName,output); 87 } 88 89 /** 90 * @Description: 根據檔案路徑刪除儲存伺服器上檔案 91 * @Param: [filePath] SysDocFile.getFilePath() 92 * @return: boolean 93 * @Author: wanfy 94 * @Date: 2019/4/23 10:08 AM 95 */ 96 public static boolean deleteFile(String filePath) throws FileparseException { 97 return FastDFSClient.deleteFile(filePath); 98 } 99 100 /** 101 * @Description: 獲取檔案大小 102 * @Param: [filePath] SysDocFile.getFilePath() 103 * @return: boolean 104 * @Author: wanfy 105 * @Date: 2019/4/23 10:08 AM 106 */ 107 public static long getFileSize(String filePath) throws FileparseException{ 108 return FastDFSClient.getFileSize(filePath); 109 } 110 111 private static String getFileNameString(String fileName) { 112 String str1; 113 if (fileName.indexOf(".") != -1) { 114 str1 = fileName.substring(0, fileName.lastIndexOf(".")); 115 } else { 116 str1 = fileName; 117 } 118 return str1; 119 } 120 121 }
測試類
1 package cn.com.fileparse.utils; 2 3 import cn.com.fileparse.client.fdfs.FastDFSClient; 4 import cn.com.fileparse.util.old.FastDFSUtil; 5 import org.csource.fastdfs.TrackerServer; 6 import org.junit.Test; 7 import org.junit.runner.RunWith; 8 import org.springframework.boot.test.context.SpringBootTest; 9 import org.springframework.test.context.junit4.SpringRunner; 10 11 import java.io.File; 12 13 /** 14 * @author jiaqiangx 15 * @program 16 * @description 17 * @create 2021-07-12 09:29 18 */ 19 @SpringBootTest 20 @RunWith(SpringRunner.class) 21 public class DFSUtilsTest { 22 @Test 23 public void upload(){ 24 try { 25 File pdf = new File("D:\\工作\\s.png"); 26 String asd = FastDFSUtil.upload(pdf); 27 System.out.println(asd); 28 } catch (Exception e) { 29 e.printStackTrace(); 30 } 31 } 32 33 @Test 34 public void getStorageClient(){ 35 try { 36 TrackerServer trackerServer = FastDFSClient.getStorageClient(); 37 trackerServer.getSocket(); 38 } catch (Exception e) { 39 e.printStackTrace(); 40 } 41 } 42 43 }