1. 程式人生 > >使用commons-pool2實現FTP連線池

使用commons-pool2實現FTP連線池

一. 連線池概述

​ 頻繁的建立和關閉連線,會極大的降低系統的效能,而連線池會在初始化的時候會建立一定數量的連線,每次訪問只需從連線池裡獲取連線,使用完畢後再放回連線池,並不是直接關閉連線,這樣可以保證程式重複使用同一個連線而不需要每次訪問都建立和關閉連線, 從而提高系統性能。

二. commons-pool2介紹

2.1 pool2的引入

<!-- 使用commons-pool2 實現ftp連線池 -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.5.0</version>
</dependency>

<!-- 引入FTPClient作為池化物件 -->
<dependency>
    <groupId>commons-net</groupId>
    <artifactId>commons-net</artifactId>
    <version>3.6</version>
</dependency>

2.2 pool2的組成

PooledObject(池化物件) PooledObjectFactory(物件工廠) ObjectPool (物件池)

對應為:

FTPClient(池化物件) FTPClientFactory(物件工廠) FTPClientPool(物件池)

關係圖:

關係圖

三. 實現連線池

3.1 配置FtpClient

我們已經有現成的池化物件(FtpClient)了,只需要新增配置即可

@ConfigurationProperties(ignoreUnknownFields = false, prefix = "ftp.client")
public class FtpClientProperties {
    // ftp地址
    private String host;
    // 埠號
    private Integer port = 21;
    // 登入使用者
    private String username;
    // 登入密碼
    private String password;
    // 被動模式
    private boolean passiveMode = false;
    // 編碼
    private String encoding = "UTF-8";
    // 連線超時時間(秒)
    private Integer connectTimeout;
    // 緩衝大小
    private Integer bufferSize = 1024;
    // 傳輸檔案型別
    private Integer transferFileType;
}

application.properties配置為:

ftp.client.host=127.0.0.1
ftp.client.port=22
ftp.client.username=root
ftp.client.password=root
ftp.client.encoding=utf-8
ftp.client.passiveMode=false
ftp.client.connectTimeout=30000

3.2 建立FtpClientFactory

​ 在commons-pool2中有兩種工廠:PooledObjectFactory 和KeyedPooledObjectFactory,我們使用前者。

public interface PooledObjectFactory<T> {
    //建立物件
    PooledObject<T> makeObject();
    //啟用物件
    void activateObject(PooledObject<T> obj);
    //鈍化物件
    void passivateObject(PooledObject<T> obj);
    //驗證物件
    boolean validateObject(PooledObject<T> obj);
    //銷燬物件
    void destroyObject(PooledObject<T> obj);
}

​ 建立FtpClientFactory只需要繼承BasePooledObjectFactory這個抽象類 ,而它則實現了PooledObjectFactory

public class FtpClientFactory extends BasePooledObjectFactory<FTPClient> {

    private FtpClientProperties config;

    public FtpClientFactory(FtpClientProperties config) {
        this.config = config;
    }

    /**
     * 建立FtpClient物件
     */
    @Override
    public FTPClient create() {
        FTPClient ftpClient = new FTPClient();
        ftpClient.setControlEncoding(config.getEncoding());
        ftpClient.setConnectTimeout(config.getConnectTimeout());
        try {

            ftpClient.connect(config.getHost(), config.getPort());
            int replyCode = ftpClient.getReplyCode();
            if (!FTPReply.isPositiveCompletion(replyCode)) {
                ftpClient.disconnect();
                log.warn("FTPServer refused connection,replyCode:{}", replyCode);
                return null;
            }

            if (!ftpClient.login(config.getUsername(), config.getPassword())) {
                log.warn("ftpClient login failed... username is {}; password: {}", config.getUsername(), config.getPassword());
            }

            ftpClient.setBufferSize(config.getBufferSize());
            ftpClient.setFileType(config.getTransferFileType());
            if (config.isPassiveMode()) {
                ftpClient.enterLocalPassiveMode();
            }

        } catch (IOException e) {
            log.error("create ftp connection failed...", e);
        }
        return ftpClient;
    }

    /**
     * 用PooledObject封裝物件放入池中
     */
    @Override
    public PooledObject<FTPClient> wrap(FTPClient ftpClient) {
        return new DefaultPooledObject<>(ftpClient);
    }

    /**
     * 銷燬FtpClient物件
     */
    @Override
    public void destroyObject(PooledObject<FTPClient> ftpPooled) {
        if (ftpPooled == null) {
            return;
        }

        FTPClient ftpClient = ftpPooled.getObject();

        try {
            if (ftpClient.isConnected()) {
                ftpClient.logout();
            }
        } catch (IOException io) {
            log.error("ftp client logout failed...{}", io);
        } finally {
            try {
                ftpClient.disconnect();
            } catch (IOException io) {
                log.error("close ftp client failed...{}", io);
            }
        }
    }

    /**
     * 驗證FtpClient物件
     */
    @Override
    public boolean validateObject(PooledObject<FTPClient> ftpPooled) {
        try {
            FTPClient ftpClient = ftpPooled.getObject();
            return ftpClient.sendNoOp();
        } catch (IOException e) {
            log.error("Failed to validate client: {}", e);
        }
        return false;
    }

}

3.3 實現FtpClientPool

​ 在commons-pool2中預設了三個可以直接使用的物件池:GenericObjectPool、GenericKeyedObjectPool和SoftReferenceObjectPool

示列:

GenericObjectPool<FTPClient> ftpClientPool = new GenericObjectPool<>(new FtpClientFactory());

我們也可以自己實現一個連線池:

public interface ObjectPool<T> extends Closeable {
    // 從池中獲取一個物件
    T borrowObject();
    // 歸還一個物件到池中
    void returnObject(T obj);
    // 廢棄一個失效的物件
    void invalidateObject(T obj); 
    // 新增物件到池
    void addObject();
    // 清空物件池
    void clear();
    // 關閉物件池
    void close();
}

通過繼承BaseObjectPool去實現ObjectPool

public class FtpClientPool extends BaseObjectPool<FTPClient> {

    private static final int DEFAULT_POOL_SIZE = 8;

    private final BlockingQueue<FTPClient> ftpBlockingQueue;
    private final FtpClientFactory ftpClientFactory;


    /**
     * 初始化連線池,需要注入一個工廠來提供FTPClient例項
     *
     * @param ftpClientFactory ftp工廠
     * @throws Exception
     */
    public FtpClientPool(FtpClientFactory ftpClientFactory) throws Exception {
        this(DEFAULT_POOL_SIZE, ftpClientFactory);
    }

    public FtpClientPool(int poolSize, FtpClientFactory factory) throws Exception {
        this.ftpClientFactory = factory;
        ftpBlockingQueue = new ArrayBlockingQueue<>(poolSize);
        initPool(poolSize);
    }

    /**
     * 初始化連線池,需要注入一個工廠來提供FTPClient例項
     *
     * @param maxPoolSize 最大連線數
     * @throws Exception
     */
    private void initPool(int maxPoolSize) throws Exception {
        for (int i = 0; i < maxPoolSize; i++) {
            // 往池中新增物件
            addObject();
        }
    }

    /**
     * 從連線池中獲取物件
     */
    @Override
    public FTPClient borrowObject() throws Exception {
        FTPClient client = ftpBlockingQueue.take();
        if (ObjectUtils.isEmpty(client)) {
            client = ftpClientFactory.create();
            // 放入連線池
            returnObject(client);
            // 驗證物件是否有效
        } else if (!ftpClientFactory.validateObject(ftpClientFactory.wrap(client))) {
            // 對無效的物件進行處理
            invalidateObject(client);
            // 建立新的物件
            client = ftpClientFactory.create();
            // 將新的物件放入連線池
            returnObject(client);
        }
        return client;
    }

    /**
     * 返還物件到連線池中
     */
    @Override
    public void returnObject(FTPClient client) {
        try {
            if (client != null && !ftpBlockingQueue.offer(client, 3, TimeUnit.SECONDS)) {
                ftpClientFactory.destroyObject(ftpClientFactory.wrap(client));
            }
        } catch (InterruptedException e) {
            log.error("return ftp client interrupted ...{}", e);
        }
    }

    /**
     * 移除無效的物件
     */
    @Override
    public void invalidateObject(FTPClient client) {
        try {
            client.changeWorkingDirectory("/");
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            ftpBlockingQueue.remove(client);
        }
    }

    /**
     * 增加一個新的連結,超時失效
     */
    @Override
    public void addObject() throws Exception {
        // 插入物件到佇列
        ftpBlockingQueue.offer(ftpClientFactory.create(), 3, TimeUnit.SECONDS);
    }

    /**
     * 關閉連線池
     */
    @Override
    public void close() {
        try {
            while (ftpBlockingQueue.iterator().hasNext()) {
                FTPClient client = ftpBlockingQueue.take();
                ftpClientFactory.destroyObject(ftpClientFactory.wrap(client));
            }
        } catch (Exception e) {
            log.error("close ftp client ftpBlockingQueue failed...{}", e);
        }
    }

}

不太贊成自己去實現連線池,這樣會帶來額外的維護成本...

四. 程式碼地址:

GitHub : https://github.com/jayknoxqu/ftp-pool

碼雲 : https://gitee.com/jayknoxqu/ftp-pool

五. 參考資料:

FTPClient連線池的實現: https://yq.aliyun.com/articles/5904

Apache Commons-pool2(整理): https://www.jianshu.com/p/b0189e01de35

官方案列: http://commons.apache.org/proper/commons-pool/examples.html


轉載自 https://www.jianshu.com/p/292d376179b1

spring配置如下

  
    <bean id="ftpFactory" class="fs.basecore.utils.FtpFactory" ></bean> 
     
	 <bean id="ftpPool"  class="fs.basecore.utils.FtpClientPool" >
	 <constructor-arg>
		   <ref bean="ftpFactory"/>
	</constructor-arg>
	</bean>

如果出現  FTP response 421 received. Server closed connection.需要重新建立連線,需要設定連結超時時間等

        ftp.setConnectTimeout(30000);
		ftp.setDataTimeout(180000);
		ftp.setControlKeepAliveTimeout(60);

  使用連線池需要注意的是設定傳輸超時時間和連線超時時間,另外就是當使用ftp.retrieveFileStream方法時,記得執行

 ftp.completePendingCommand();

inputStream = ftp.retrieveFileStream(file.getName());
                     ServletOutputStream outputStream = response.getOutputStream();
                   //讀取檔案流
             		int len = 0;
             		byte[] buffer = new byte[1024 * 10];
             		while ((len = inputStream.read(buffer)) != -1){
             			outputStream.write(buffer,0,len);
             		}
             		outputStream.flush();
             		outputStream.close();
             		inputStream.close();
             		boolean completePendingCommand = ftp.completePendingCommand();
             		if(!completePendingCommand){
             			//移除此物件並重新建一個新的ftpClient並新增至池
             		}