1. 程式人生 > 其它 >基於commons-pool2實現FastDFS連線池+心跳檢測

基於commons-pool2實現FastDFS連線池+心跳檢測

yml 配置檔案

#fastdfs相關配置
fastdfs:
  connectTimeoutInSeconds: 30
  networkTimeoutInSeconds: 60
  charset: UTF-8
  httpAntiStealToken: no
  httpSecretKey: FastDFS1234567890
  httpTrackerHttpPort: 8888
  trackerServers: 82.157.0.217:22122
  maxStorageConnection: 10 #最大連線數
  defaultPoolSize: 5 #預設連線數
  connectionRetrySize: 
5 #連線重試次數
TrackerServer客戶端工廠
  1 package cn.com.fileparse.client.fdfs;  5 import lombok.extern.slf4j.Slf4j;
  6 import org.apache.commons.lang.exception.ExceptionUtils;
  7 import org.apache.commons.pool2.BasePooledObjectFactory;
  8 import org.apache.commons.pool2.PooledObject;
  9 import org.apache.commons.pool2.impl.DefaultPooledObject;
10 import org.csource.fastdfs.ClientGlobal; 11 import org.csource.fastdfs.ProtoCommon; 12 import org.csource.fastdfs.TrackerClient; 13 import org.csource.fastdfs.TrackerServer; 14 15 import java.io.IOException; 16 import java.util.Properties; 17 18 /** 19 *@program 20 *@description TrackerServer客戶端工廠
21 *@author jiaqiangx 22 *@create 2021-08-13 11:20 23 */ 24 @Slf4j 25 public class TrackerServerFactory extends BasePooledObjectFactory { 26 private FastDfsProperties fastDfsProperties; 27 28 public TrackerServerFactory(FastDfsProperties fastDfsProperties) { 29 this.fastDfsProperties = fastDfsProperties; 30 } 31 private static Properties properties = new Properties(); 32 33 @Override 34 public TrackerServer create() throws FileparseException { 35 try { 36 initClientGlobal(); 37 } catch (Exception e) { 38 log.warn("FastDFS initClientGlobal failed [{}]", ExceptionUtils.getFullStackTrace(e)); 39 return null; 40 } 41 // TrackerClient 42 TrackerClient trackerClient = new TrackerClient(); 43 // TrackerServer 44 TrackerServer trackerServer = null; 45 int flag =0; 46 try { 47 trackerServer = trackerClient.getConnection(); 48 } catch (IOException e) { 49 log.warn("FastDFS connect failed "); 50 return null; 51 } 52 try { 53 while (trackerServer == null && flag < fastDfsProperties.getConnectionRetrySize()) { 54 log.info("[建立TrackerServer(createTrackerServer)][第{}次重建]", flag); 55 flag++; 56 initClientGlobal(); 57 trackerServer = trackerClient.getConnection(); 58 } 59 if(ProtoCommon.activeTest(trackerServer.getSocket())){ 60 return trackerServer; 61 } 62 63 } catch (Exception e) { 64 log.error("[建立TrackerServer(createTrackerServer)][異常:{}]", ExceptionUtils.getFullStackTrace(e)); 65 66 } finally { 67 if (trackerServer != null) { 68 try { 69 trackerServer.close(); 70 } catch (Exception e) { 71 log.error("[建立TrackerServer(createTrackerServer)--關閉trackerServer異常][異常:{}]", ExceptionUtils.getFullStackTrace(e)); 72 } 73 } 74 } 75 return trackerServer; 76 77 } 78 79 @Override 80 public PooledObject wrap(TrackerServer obj) { 81 return new DefaultPooledObject<>(obj); 82 } 83 84 /** 85 * 銷燬TrackerServer物件 86 */ 87 @Override 88 public void destroyObject(PooledObject ftpPooled) { 89 90 if (ftpPooled != null) { 91 TrackerServer trackerServer = ftpPooled.getObject(); 92 try { 93 trackerServer.close(); 94 } catch (IOException e) { 95 log.error("FastDFS client logout failed...{}", ExceptionUtils.getFullStackTrace(e)); 96 } 97 } 98 99 } 100 101 /** 102 * 驗證TrackerServer物件是否還可用 103 */ 104 @Override 105 public boolean validateObject(PooledObject pooledObject) { 106 try { 107 return ProtoCommon.activeTest(pooledObject.getObject().getSocket()); 108 } catch (IOException e) { 109 log.error("Failed to validate client: {}", ExceptionUtils.getFullStackTrace(e)); 110 } 111 return false; 112 } 113 114 private void initClientGlobal() throws Exception { 115 properties.put(ClientGlobal.PROP_KEY_CONNECT_TIMEOUT_IN_SECONDS, fastDfsProperties.getConnectTimeoutInSeconds()); 116 properties.put(ClientGlobal.PROP_KEY_NETWORK_TIMEOUT_IN_SECONDS, fastDfsProperties.getNetworkTimeoutInSeconds()); 117 properties.put(ClientGlobal.PROP_KEY_CHARSET, fastDfsProperties.getCharset()); 118 properties.put(ClientGlobal.PROP_KEY_HTTP_TRACKER_HTTP_PORT, fastDfsProperties.getHttpTrackerHttpPort()); 119 properties.put(ClientGlobal.PROP_KEY_HTTP_ANTI_STEAL_TOKEN, fastDfsProperties.getHttpAntiStealToken()); 120 properties.put(ClientGlobal.PROP_KEY_HTTP_SECRET_KEY, fastDfsProperties.getHttpSecretKey()); 121 properties.put(ClientGlobal.PROP_KEY_TRACKER_SERVERS, fastDfsProperties.getTrackerServers()); 122 ClientGlobal.initByProperties(properties); 123 } 124 125 }

FastDfsProperties 讀取yml配置

 1 package cn.com.fileparse.properties;
 2 
 3 import lombok.Data;
 4 import org.springframework.boot.context.properties.ConfigurationProperties;
 5 
 6 /**
 7  * @author jiaqiangx
 8  * @program ebs-file-service
 9  * @description
10  * @create 2021-07-09 09:22
11  */
12 @Data
13 @ConfigurationProperties(prefix = FastDfsProperties.PREFIX, ignoreUnknownFields = false)
14 public class FastDfsProperties {
15     public static final String PREFIX = "fastdfs";
16 
17     private String connectTimeoutInSeconds;
18     private String networkTimeoutInSeconds;
19     private String charset;
20     private String httpAntiStealToken;
21     private String httpSecretKey;
22     private String httpTrackerHttpPort;
23     private String trackerServers;
24     private Integer maxStorageConnection;
25     private Integer connectionRetrySize;
26     private Integer defaultPoolSize;
27 
28 }
FastDFS 連線池
  1 package cn.com.fileparse.client.fdfs;
  2 
  3 import cn.com.fileparse.constant.ResponseCode;
  4 import cn.com.fileparse.exception.FileparseException;
  5 import cn.com.fileparse.util.YmlUtils;
  6 import lombok.extern.slf4j.Slf4j;
  7 import org.apache.commons.lang.exception.ExceptionUtils;
  8 import org.apache.commons.pool2.BaseObjectPool;
  9 import org.csource.fastdfs.TrackerServer;
 10 import org.springframework.util.ObjectUtils;
 11 
 12 import java.util.concurrent.ArrayBlockingQueue;
 13 import java.util.concurrent.BlockingQueue;
 14 import java.util.concurrent.ExecutorService;
 15 import java.util.concurrent.Executors;
 16 import java.util.concurrent.TimeUnit;
 17 
 18 /**
 19 *@program  
 20 *@description FastDFS client 池
 21 *@author jiaqiangx
 22 *@create 2021-08-13 11:20
 23 */
 24 @Slf4j
 25 public class TrackerServerPool extends BaseObjectPool {
 26     private final BlockingQueue dfsBlockingQueue;
 27     private final TrackerServerFactory trackerServerFactory;
 28     private static Integer poolSize =  Integer.valueOf(YmlUtils.getValue("fastdfs.defaultPoolSize"));
 29     private static Integer poolMaxSize =  Integer.valueOf(YmlUtils.getValue("fastdfs.maxStorageConnection"));
 30     /**
 31      * 初始化連線池,需要注入一個工廠來提供TrackerServer例項
 32      *
 33      * @param trackerServerFactory trackerServe工廠
 34      * @throws Exception
 35      */
 36     public TrackerServerPool(TrackerServerFactory trackerServerFactory) throws Exception {
 37         this.trackerServerFactory = trackerServerFactory;
 38         dfsBlockingQueue = new ArrayBlockingQueue<>(poolSize);
 39         initPool(poolSize);
 40     }
 41 
 42 
 43     /**
 44      * 初始化連線池,需要注入一個工廠來提供TrackerServer例項
 45      *
 46      * @param maxPoolSize 最大連線數
 47      * @throws Exception
 48      */
 49 
 50     private void initPool(int maxPoolSize) throws Exception {
 51         ExecutorService executorService = Executors.newFixedThreadPool(1);
 52         executorService.submit(() -> {
 53             log.info("-----Init TrackerServer Connect START------");
 54             for (int i = 0; i < maxPoolSize; i++) {
 55                 // 往池中新增物件
 56                 try {
 57                     addObject();
 58                 }catch (Exception e){
 59                     log.warn("初始化連線池失敗");
 60                 }
 61             }
 62         });
 63     }
 64 
 65     /**
 66      * 獲取連線
 67      *
 68      * @return
 69      * @throws Exception
 70      */
 71     @Override
 72     public TrackerServer borrowObject() throws FileparseException {
 73         TrackerServer client = null;
 74         try {
 75             //client = dfsBlockingQueue.take();
 76             //從佇列中獲取值 預設30S超時
 77             client = dfsBlockingQueue.poll(30,TimeUnit.SECONDS);
 78         } catch (InterruptedException e) {
 79             log.info("queue獲取任務異常,異常原因:{}", ExceptionUtils.getFullStackTrace(e));
 80         }
 81 
 82         if (ObjectUtils.isEmpty(client)) {
 83             if (dfsBlockingQueue.size() < poolMaxSize) {
 84                 client = trackerServerFactory.create();
 85                 // 放入連線池
 86                 returnObject(client);
 87             }else {
 88                 //連線數已滿
 89                 throw new FileparseException(ResponseCode.CONNECT_OVERFLOW);
 90             }
 91             // 驗證物件是否有效  這裡通過實踐驗證 如果長時間不校驗是否存活,則這裡會報通道已斷開等錯誤
 92         } else if (!trackerServerFactory.validateObject(trackerServerFactory.wrap(client))) {
 93             // 對無效的物件進行處理
 94             invalidateObject(client);
 95             // 建立新的物件
 96             client = trackerServerFactory.create();
 97             // 將新的物件放入連線池
 98             returnObject(client);
 99         }
100         return client;
101     }
102 
103     @Override
104     public void returnObject(TrackerServer client) throws FileparseException{
105         try {
106            if (client != null && !dfsBlockingQueue.offer(client, 10, TimeUnit.SECONDS)) {
107                 trackerServerFactory.destroyObject(trackerServerFactory.wrap(client));
108                //連線池已滿 丟擲指定異常
109                throw new FileparseException(ResponseCode.CONNECT_OVERFLOW);
110             }
111         } catch (InterruptedException e) {
112             log.error("return TrackerServer client interrupted ...{}", ExceptionUtils.getFullStackTrace(e));
113         }
114     }
115 
116     @Override
117     public void invalidateObject(TrackerServer client) {
118         trackerServerFactory.destroyObject(trackerServerFactory.wrap(client));
119         dfsBlockingQueue.remove(client);
120     }
121 
122     /**
123      * 增加一個新的連結,超時失效
124      */
125     @Override
126     public void addObject() throws Exception {
127         // 插入物件到佇列
128         dfsBlockingQueue.offer(trackerServerFactory.create(), 10, TimeUnit.SECONDS);
129     }
130 
131     /**
132      * @param trackerServer 需釋放的連線物件
133      * @Description: 釋放繁忙連線 1.如果空閒池的連線小於最小連線值,就把當前連線放入idleConnectionPool;
134      * 2.如果空閒池的連線等於或大於最小連線值,就把當前釋放連線丟棄;
135      */
136     public void checkin(TrackerServer trackerServer) {
137         //log.info("[釋放當前連線(checkin)][prams:{}}] ", trackerServer);
138         if (trackerServer != null) {
139             if (dfsBlockingQueue.size() < poolSize) {
140                 dfsBlockingQueue.add(trackerServer);
141             } else {
142                 trackerServerFactory.destroyObject(trackerServerFactory.wrap(trackerServer));
143             }
144         }
145 
146     }
147 
148 
149     /**
150      * 關閉連線池
151      */
152     @Override
153     public void close() {
154         try {
155             while (dfsBlockingQueue.iterator().hasNext()) {
156                 TrackerServer client = dfsBlockingQueue.take();
157                 trackerServerFactory.destroyObject(trackerServerFactory.wrap(client));
158             }
159         } catch (Exception e) {
160             log.error("close TrackerServer client dfsBlockingQueue failed...{}", ExceptionUtils.getFullStackTrace(e));
161         }
162     }
163 
164     public BlockingQueue getDfsBlockingQueue() {
165         return dfsBlockingQueue;
166     }
167 
168 }

心跳檢測

 1 package cn.com.fileparse.client.fdfs;
 2 
 3 import lombok.extern.slf4j.Slf4j;
 4 import org.apache.commons.lang.exception.ExceptionUtils;
 5 import org.csource.fastdfs.ProtoCommon;
 6 import org.csource.fastdfs.TrackerServer;
 7 import org.springframework.beans.factory.annotation.Autowired;
 8 
 9 import javax.annotation.PostConstruct;
10 import java.util.Iterator;
11 import java.util.concurrent.BlockingQueue;
12 
13 /**
14 *@program 
15 *@description 檢測TrackerServer客戶端是否在活著
16 *@author jiaqiangx
17 *@create 2021-06-29 22:42
18 */
19 @Slf4j
20 public class TrackerServerKeepAlive {
21 
22     private KeepAliveThread keepAliveThread;
23 
24     @Autowired
25     private TrackerServerPool trackerServerPool;
26 
27     private final String THREAD_NAME = "tracker-client-alive-thread";
28 
29     /**
30      * 等待時間
31      */
32     public static int waitTimes = 1200;
33 
34 
35     @PostConstruct
36     public void init() {
37         // 啟動心跳檢測執行緒
38         if (keepAliveThread == null) {
39             keepAliveThread = new KeepAliveThread();
40             Thread thread = new Thread(keepAliveThread, THREAD_NAME);
41             thread.start();
42         }
43     }
44 
45     class KeepAliveThread implements Runnable {
46         @Override
47         public void run() {
48             TrackerServer ts = null;
49             while (true) {
50                 try {
51                     BlockingQueue pool = trackerServerPool.getDfsBlockingQueue();
52                     if (pool != null && pool.size() > 0) {
53                         Iterator it = pool.iterator();
54                         while (it.hasNext()) {
55                             ts = it.next();
56                             if (ts != null) {
57                                 boolean result = ProtoCommon.activeTest(ts.getSocket());
58                                 log.info("trackerServer心跳檢測結果:{} ", result);
59                                 if(!result){
60                                     //清除失效連線
61                                     trackerServerPool.invalidateObject(ts);
62                                 }else {
63                                     //回收空閒連線
64                                     trackerServerPool.checkin(ts);
65                                 }
66                             } else {
67                                 /** 代表已經沒有空閒長連線 */
68                                 break;
69                             }
70                         }
71                     }
72                 } catch (Exception e) {
73                     log.error("trackerServer心跳檢測異常{}", ExceptionUtils.getFullStackTrace(e));
74                     trackerServerPool.invalidateObject(ts);
75                 }
76                 // 每30s傳送一次心跳
77                 try {
78                     Thread.sleep(1000 * 30);
79                 } catch (InterruptedException e) {
80                     log.error("trackerServer休眠異常{}", ExceptionUtils.getFullStackTrace(e));
81                 }
82             }
83 
84         }
85     }
86 }

啟動初始化服務

 1 package cn.com.fileparse.config;
 2 
 3 import cn.com.fileparse.client.fdfs.TrackerServerKeepAlive;
 4 import cn.com.fileparse.client.fdfs.TrackerServerFactory;
 5 import cn.com.fileparse.client.fdfs.TrackerServerPool;
 6 import cn.com.fileparse.properties.FastDfsProperties;
 7 import lombok.extern.slf4j.Slf4j;
 8 import org.springframework.beans.factory.annotation.Autowired;
 9 import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
10 import org.springframework.boot.context.properties.EnableConfigurationProperties;
11 import org.springframework.context.annotation.Bean;
12 
13 /**
14 *@program 
15 *@description
16 *@author jiaqiangx
17 *@create 2021-06-30 21:52
18 */
19 @Slf4j
20 @EnableConfigurationProperties(FastDfsProperties.class)
21 public class FastDFSConfig {
22     @Autowired
23     FastDfsProperties fastDfsProperties;
24     /**
25      * 客戶端工廠
26      *
27      * @return
28      */
29     @Bean
30     public TrackerServerFactory trackerServerFactory() {
31         return new TrackerServerFactory(fastDfsProperties);
32     }
33 
34     /**
35      * 連線池
36      *
37      * @param trackerServerFactory
38      * @return
39      * @throws Exception
40      */
41     @Bean
42     public TrackerServerPool TrackerServerClientPool(TrackerServerFactory trackerServerFactory) throws Exception {
43         return new TrackerServerPool(trackerServerFactory);
44     }
45 
46     /**
47      * TrackerServer心跳檢測
48      */
49     @Bean
50     @ConditionalOnBean(TrackerServerPool.class)
51     public TrackerServerKeepAlive trackerServerKeepAlive() {
52         return new TrackerServerKeepAlive();
53     }
54 
55 }

註解配置

 1 package cn.com.fileparse.annotation;
 2 import cn.com.fileparse.config.FastDFSConfig;
 3 import org.springframework.context.annotation.Import;
 4 
 5 import java.lang.annotation.ElementType;
 6 import java.lang.annotation.Retention;
 7 import java.lang.annotation.RetentionPolicy;
 8 import java.lang.annotation.Target;
 9 
10 /**
11 *@program 
12 *@description 啟用FastDFS自動配置
13 *@author jiaqiangx
14 *@create 2021-08-13 14:17
15 */
16 @Target(ElementType.TYPE)
17 @Retention(RetentionPolicy.RUNTIME)
18 @Import(FastDFSConfig.class)
19 public @interface EnableFastDFS {
20 }
檔案上傳下載主類
  1 package cn.com.fileparse.client.fdfs;
  2 
  3 import cn.com.fileparse.annotation.EnableFastDFS;
  4 import cn.com.fileparse.client.ftp.FtpClientPool;
  5 import cn.com.fileparse.constant.ResponseCode;
  6 import cn.com.fileparse.exception.FileparseException;
  7 import cn.com.fileparse.util.common.SpringBeanUtil;
  8 import lombok.extern.slf4j.Slf4j;
  9 import org.apache.commons.io.FilenameUtils;
 10 import org.apache.commons.lang.exception.ExceptionUtils;
 11 import org.apache.commons.lang3.StringUtils;
 12 import org.csource.common.MyException;
 13 import org.csource.common.NameValuePair;
 14 import org.csource.fastdfs.DownloadStream;
 15 import org.csource.fastdfs.FileInfo;
 16 import org.csource.fastdfs.StorageClient;
 17 import org.csource.fastdfs.StorageClient1;
 18 import org.csource.fastdfs.TrackerServer;
 19 import org.csource.fastdfs.UploadCallback;
 20 import org.csource.fastdfs.UploadStream;
 21 import org.springframework.stereotype.Component;
 22 
 23 import java.io.File;
 24 import java.io.FileInputStream;
 25 import java.io.FileNotFoundException;
 26 import java.io.FileOutputStream;
 27 import java.io.IOException;
 28 import java.io.InputStream;
 29 import java.io.OutputStream;
 30 
 31 /**
 32  * FastDFS Java API. 檔案上傳下載主類.
 33  *
 34  */
 35 @Slf4j
 36 @EnableFastDFS
 37 @Component
 38 public class FastDFSClient {
 39 
 40     public static TrackerServer getStorageClient() throws FileparseException{
 41         TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class);
 42         TrackerServer trackerServer = null;
 43         try {
 44             trackerServer = trackerServerPool.borrowObject();
 45         } catch (FileparseException e) {
 46             throw new FileparseException(ResponseCode.GET_DFS_CONNECT_ERROR);
 47         }
 48 
 49         return trackerServer;
 50     }
 51 
 52     public static String upload(File file) throws FileparseException {
 53       return upload(file, null);
 54     }
 55 
 56     public static String upload(File file, String groupName) throws FileparseException {
 57         String filePath;
 58         FileInputStream in = null;
 59         try {
 60             in = new FileInputStream(file);
 61             String fileExtName = FilenameUtils.getExtension(file.getName());
 62             // 設定檔案元資訊
 63             NameValuePair[] metaList = new NameValuePair[3];
 64             metaList[0] = new NameValuePair("fileExtName", fileExtName);
 65             metaList[1] = new NameValuePair("fileLength", String.valueOf(file.length()));
 66             metaList[2] = new NameValuePair("fileName", getFileNameString(file.getName()));
 67             filePath = upload(null, in.getChannel().size(), new UploadStream(in, file.length()), fileExtName, metaList);
 68         } catch (Exception e) {
 69             log.error("上傳檔案到fastdfs失敗!檔名:{}", file.getName());
 70             throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR);
 71         } finally {
 72             try {
 73                 if (in != null) {
 74                     in.close();
 75                 }
 76             } catch (IOException e) {
 77                 log.error("close FileInputStream failed...{}", ExceptionUtils.getFullStackTrace(e));
 78             }
 79         }
 80         return filePath;
 81 
 82     }
 83 
 84     public static String upload(InputStream in, String fileName) throws FileparseException {
 85         String filePath;
 86         try {
 87             String fileExtName = FilenameUtils.getExtension(fileName);
 88             // 設定檔案元資訊
 89             NameValuePair[] metaList = new NameValuePair[3];
 90             metaList[0] = new NameValuePair("fileExtName", fileExtName);
 91             metaList[1] = new NameValuePair("fileLength", String.valueOf(in.available()));
 92             metaList[2] = new NameValuePair("fileName", getFileNameString(fileName));
 93             filePath = upload(null, in.available(),  new UploadStream(in, in.available()), fileExtName, metaList);
 94         } catch (Exception e) {
 95             log.error("上傳檔案到fastdfs失敗!檔名:{}", fileName);
 96             throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR);
 97         } finally {
 98             try {
 99                 if (in != null) {
100                     in.close();
101                 }
102             } catch (IOException e) {
103                 log.error("close FileInputStream failed...{}", ExceptionUtils.getFullStackTrace(e));
104             }
105         }
106         return filePath;
107 
108     }
109 
110     /**
111      * 上傳通用方法
112      */
113     public static String upload(String groupName, long file_size, UploadCallback callback, String file_ext_name, NameValuePair[] meta_list) throws FileparseException {
114         TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class);
115         TrackerServer trackerServer = trackerServerPool.borrowObject();
116         StorageClient1 storageClient = new StorageClient1(trackerServer, null);
117         String path = null;
118         try {
119             // 上傳
120             path = storageClient.upload_file1(groupName, file_size, callback, file_ext_name, meta_list);
121             if (StringUtils.isBlank(path)) {
122                 throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR);
123             }
124             log.info("上傳檔案到fastdfs成功!檔案地址:{}", path);
125         } catch (IOException e) {
126             log.error("上傳檔案到fastdfs失敗!失敗原因:{}", ExceptionUtils.getFullStackTrace(e));
127             throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR);
128         } catch (MyException e) {
129             log.error("上傳檔案到fastdfs失敗!失敗原因:{}", ExceptionUtils.getFullStackTrace(e));
130             throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR);
131         } finally {
132             // 返還物件
133             trackerServerPool.checkin(trackerServer);
134         }
135         return path;
136     }
137 
138 
139     public static void download(String filePathName, File file) throws FileNotFoundException, FileparseException {
140         if (!file.exists()) {
141             File parentFile = file.getParentFile();
142             if (!parentFile.exists()) {
143                 parentFile.mkdirs();
144             }
145         }
146         FileOutputStream in = null;
147         try {
148             in = new FileOutputStream(file);
149             download(filePathName, in);
150         } catch (Exception e) {
151             log.error("從fastdfs下載檔案失敗!失敗原因:{}", ExceptionUtils.getFullStackTrace(e));
152             throw new FileparseException(ResponseCode.DOWN_FILE_ERROR_1);
153         }
154     }
155     public static void download(String filePathName, OutputStream output) throws FileparseException {
156         TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class);
157         TrackerServer trackerServer = trackerServerPool.borrowObject();
158         StorageClient1 storageClient = new StorageClient1(trackerServer, null);
159         try {
160             storageClient.download_file1(filePathName, new DownloadStream(output));
161             log.info("從fastdfs下載檔案成功!檔名稱:{}", filePathName);
162         } catch (Exception e) {
163             log.error("從fastdfs下載檔案失敗,檔案路徑:{}", filePathName);
164             throw new FileparseException(ResponseCode.DOWN_FILE_ERROR.getDesc()+e.getMessage(), ResponseCode.DOWN_FILE_ERROR.getCode());
165         }finally {
166             // 返還物件
167             trackerServerPool.checkin(trackerServer);
168         }
169     }
170 
171     /**
172      * @Description: 根據檔案路徑刪除儲存伺服器上檔案
173      * @Param: [filePath]  SysDocFile.getFilePath()
174      * @return: boolean
175      * @Author: wanfy
176      * @Date: 2019/4/23 10:08 AM
177      */
178     public static boolean deleteFile(String filePath) throws FileparseException{
179         boolean success = false;
180         TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class);
181         TrackerServer trackerServer = trackerServerPool.borrowObject();
182         StorageClient1 storageClient = new StorageClient1(trackerServer, null);
183         try {
184             String group_name = filePath.split("/")[0];
185             String remote_filename = filePath.substring(group_name.length() + 1);
186             int i = storageClient.delete_file(group_name, remote_filename);
187             if (i == 0) {
188                 log.info("已從fastdfs上刪除檔案");
189                 success = true;
190             }
191         } catch (Exception e) {
192             log.warn("從fastdfs上刪除檔案失敗");
193             throw new FileparseException(ResponseCode.DEF_ERROR_CODE);
194         } finally {
195             // 返還物件
196             trackerServerPool.checkin(trackerServer);
197         }
198         return success;
199     }
200 
201     /**
202      * @Description: 獲取檔案大小
203      * @Param: [filePath]  SysDocFile.getFilePath()
204      * @return: boolean
205      * @Author: wanfy
206      * @Date: 2019/4/23 10:08 AM
207      */
208     public static long getFileSize(String filePath) throws FileparseException{
209         TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class);
210         TrackerServer trackerServer = trackerServerPool.borrowObject();
211         StorageClient1 storageClient = new StorageClient1(trackerServer, null);
212         try {
213             String group_name = filePath.split("/")[0];
214             String remote_filename = filePath.substring(group_name.length() + 1);
215             FileInfo fileInfo = storageClient.get_file_info(group_name, remote_filename);
216             if (fileInfo == null) {
217                 log.warn("fastdfs查詢檔案為空");
218                 throw new FileparseException(ResponseCode.DOWN_FILE_IS_NULL);
219             }
220             return fileInfo.getFileSize();
221         } catch (Exception e) {
222             log.warn("從fastdfs上獲取檔案大小失敗");
223             throw new FileparseException(ResponseCode.DOWN_FILE_ERROR.getDesc()+e.getMessage(), ResponseCode.DOWN_FILE_ERROR.getCode());
224         } finally {
225             // 返還物件
226             trackerServerPool.checkin(trackerServer);
227         }
228     }
229 
230     private static String getFileNameString(String fileName) {
231         String str1;
232         if (fileName.indexOf(".") != -1) {
233             str1 = fileName.substring(0, fileName.lastIndexOf("."));
234         } else {
235             str1 = fileName;
236         }
237         return str1;
238     }
239 }

簡單寫幾個工具類

  1 /**
  2  *
  3  */
  4 package cn.com.fileparse.util.old;
  5 
  6 import cn.com.fileparse.client.fdfs.FastDFSClient;
  7 import cn.com.fileparse.exception.FileparseException;
  8 import lombok.extern.slf4j.Slf4j;
  9 import org.apache.commons.io.FilenameUtils;
 10 import org.csource.common.NameValuePair;
 11 import org.csource.fastdfs.StorageClient1;
 12 import org.csource.fastdfs.StorageServer;
 13 import org.csource.fastdfs.UploadStream;
 14 import org.springframework.stereotype.Component;
 15 
 16 import java.io.File;
 17 import java.io.FileInputStream;
 18 import java.io.FileNotFoundException;
 19 import java.io.FileOutputStream;
 20 import java.io.IOException;
 21 import java.io.InputStream;
 22 import java.io.OutputStream;
 23 
 24 /**
 25  * @author jiaqiangx
 26  */
 27 @Component
 28 @Slf4j
 29 public class FastDFSUtil {
 30 
 31     public static String upload(File file) throws FileparseException{
 32         return FastDFSClient.upload(file, null);
 33     }
 34 
 35     public static String upload(InputStream in, String fileName) throws FileparseException{
 36         return FastDFSClient.upload(in, fileName);
 37     }
 38 
 39     /**
 40     * @Description:上傳多個檔案到本機的storage伺服器 (供CA服務專用)
 41     * @Param: [file]
 42     * @return: java.lang.String
 43     * @Author: wanfy
 44     * @Date: 2019/5/21 3:44 PM
 45     */
 46     public static String uploadFileToLocalhostStorage(File file) {
 47         String filePath;
 48         StorageServer storageServer = null;
 49         try (FileInputStream in = new FileInputStream(file)) {
 50             String fileExtName = FilenameUtils.getExtension(file.getName());
 51             storageServer = new StorageServer("localhost", 23000, 0);
 52             StorageClient1 client = new StorageClient1(null, storageServer);
 53             // 設定檔案元資訊
 54             NameValuePair[] metaList = new NameValuePair[3];
 55             metaList[0] = new NameValuePair("fileExtName", fileExtName);
 56             metaList[1] = new NameValuePair("fileLength", String.valueOf(file.length()));
 57             metaList[2] = new NameValuePair("fileName", getFileNameString(file.getName()));
 58             filePath = client.upload_file1(null, in.getChannel().size(), new UploadStream(in, file.length()), fileExtName, metaList);
 59             log.info("上傳檔案到fastdfs成功!");
 60         } catch (Exception e) {
 61             log.error("上傳檔案到fastdfs失敗!檔名:{}", file.getName());
 62             throw new RuntimeException(e);
 63         } finally {
 64             try {
 65                 if (storageServer != null) {
 66                     storageServer.close();
 67                 }
 68             } catch (IOException e) {
 69                 log.warn("storageServer關閉失敗");
 70             }
 71         }
 72         return filePath;
 73     }
 74 
 75     public static void download(String filePathName, File file) throws FileNotFoundException,FileparseException {
 76         if (!file.exists()) {
 77             File parentFile = file.getParentFile();
 78             if (!parentFile.exists()) {
 79                 parentFile.mkdirs();
 80             }
 81         }
 82         download(filePathName, new FileOutputStream(file));
 83     }
 84 
 85     public static void download(String filePathName, OutputStream output) throws FileparseException {
 86             FastDFSClient.download(filePathName,output);
 87     }
 88 
 89     /**
 90      * @Description: 根據檔案路徑刪除儲存伺服器上檔案
 91      * @Param: [filePath]  SysDocFile.getFilePath()
 92      * @return: boolean
 93      * @Author: wanfy
 94      * @Date: 2019/4/23 10:08 AM
 95      */
 96     public static boolean deleteFile(String filePath) throws FileparseException {
 97         return FastDFSClient.deleteFile(filePath);
 98     }
 99 
100     /**
101      * @Description: 獲取檔案大小
102      * @Param: [filePath]  SysDocFile.getFilePath()
103      * @return: boolean
104      * @Author: wanfy
105      * @Date: 2019/4/23 10:08 AM
106      */
107     public static long getFileSize(String filePath) throws FileparseException{
108        return FastDFSClient.getFileSize(filePath);
109     }
110 
111     private static String getFileNameString(String fileName) {
112         String str1;
113         if (fileName.indexOf(".") != -1) {
114             str1 = fileName.substring(0, fileName.lastIndexOf("."));
115         } else {
116             str1 = fileName;
117         }
118         return str1;
119     }
120 
121 }

測試類

 1 package cn.com.fileparse.utils;
 2 
 3 import cn.com.fileparse.client.fdfs.FastDFSClient;
 4 import cn.com.fileparse.util.old.FastDFSUtil;
 5 import org.csource.fastdfs.TrackerServer;
 6 import org.junit.Test;
 7 import org.junit.runner.RunWith;
 8 import org.springframework.boot.test.context.SpringBootTest;
 9 import org.springframework.test.context.junit4.SpringRunner;
10 
11 import java.io.File;
12 
13 /**
14  * @author jiaqiangx
15  * @program 
16  * @description
17  * @create 2021-07-12 09:29
18  */
19 @SpringBootTest
20 @RunWith(SpringRunner.class)
21 public class DFSUtilsTest {
22     @Test
23     public void upload(){
24         try {
25             File pdf = new File("D:\\工作\\s.png");
26            String asd = FastDFSUtil.upload(pdf);
27             System.out.println(asd);
28         } catch (Exception e) {
29             e.printStackTrace();
30         }
31     }
32 
33     @Test
34     public void getStorageClient(){
35         try {
36             TrackerServer trackerServer = FastDFSClient.getStorageClient();
37             trackerServer.getSocket();
38         } catch (Exception e) {
39             e.printStackTrace();
40         }
41     }
42 
43 }