1. 程式人生 > 其它 >使用Apache commons-pool2實現高效的FTPClient連線池的方法

使用Apache commons-pool2實現高效的FTPClient連線池的方法

一. 連線池概述
​ 頻繁的建立和關閉連線,會極大的降低系統的效能,而連線池會在初始化的時候會建立一定數量的連線,每次訪問只需從連線池裡獲取連線,使用完畢後再放回連線池,並不是直接關閉連線,這樣可以保證程式重複使用同一個連線而不需要每次訪問都建立和關閉連線, 從而提高系統性能。有些物件的建立開銷是比較大的,比如資料庫連線等。為了減少頻繁建立、銷燬物件帶來的效能消耗,我們可以利用物件池的技術來實現物件的複用。物件池提供了一種機制,它可以管理物件池中物件的生命週期,提供了獲取和釋放物件的方法,可以讓客戶端很方便的使用物件池中的物件。

二. commons-pool2介紹
2.1 pool2的引入

<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.6</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
2.2 pool2的組成

PooledObject(池化物件) PooledObjectFactory(物件工廠) ObjectPool (物件池)

對應為: FTPClient(池化物件) FTPClientFactory(物件工廠) FTPClientPool(物件池)

關係圖:

關係圖

三. 實現連線池
3.1 配置FtpClient

我們已經有現成的池化物件(FtpClient)了,只需要新增配置即可,FTPClientConfig【FTP連線配置類】


/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.tompai.ftp.pool;

import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

/**
* @desc: demo
* @name: FTPClientConfig.java
* @author: tompai
* @email:[email protected]
* @createTime: 2019年12月31日 下午12:53:48
* @history:
* @version: v1.0
*/

public class FTPClientConfig extends GenericObjectPoolConfig<FTPClient> {

private String host;// 主機名
private int port = 21;// 埠
private String username;// 使用者名稱
private String password;// 密碼

private int connectTimeOut = 5000;// ftp 連線超時時間 毫秒
private String controlEncoding = "utf-8";
private int bufferSize = 1024;// 緩衝區大小
private int fileType = 2;// 傳輸資料格式 2表binary二進位制資料
private int dataTimeout = 120 * 1000;
private boolean useEPSVwithIPv4 = false;
private boolean passiveMode = true;// 是否啟用被動模式
private int threadNum=1;//開啟執行緒數
private int transferFileType=FTPClient.BINARY_FILE_TYPE;//傳輸檔案型別
private boolean renameUploaded=false;//是否上傳檔案重新命名;
private int retryTimes=3;//重試次數

public String getHost() {

return host;
}

public void setHost(String host) {

this.host = host;
}

public int getPort() {

return port;
}

public void setPort(int port) {

this.port = port;
}

public String getUsername() {

return username;
}

public void setUsername(String username) {

this.username = username;
}

public String getPassword() {

return password;
}

public void setPassword(String password) {

this.password = password;
}

public int getConnectTimeOut() {

return connectTimeOut;
}

public void setConnectTimeOut(int connectTimeOut) {

this.connectTimeOut = connectTimeOut;
}

public String getControlEncoding() {

return controlEncoding;
}

public void setControlEncoding(String controlEncoding) {

this.controlEncoding = controlEncoding;
}

public int getBufferSize() {

return bufferSize;
}

public void setBufferSize(int bufferSize) {

this.bufferSize = bufferSize;
}

public int getFileType() {

return fileType;
}

public void setFileType(int fileType) {

this.fileType = fileType;
}

public int getDataTimeout() {

return dataTimeout;
}

public void setDataTimeout(int dataTimeout) {

this.dataTimeout = dataTimeout;
}

public boolean isUseEPSVwithIPv4() {

return useEPSVwithIPv4;
}

public void setUseEPSVwithIPv4(boolean useEPSVwithIPv4) {

this.useEPSVwithIPv4 = useEPSVwithIPv4;
}

public boolean isPassiveMode() {

return passiveMode;
}

public void setPassiveMode(boolean passiveMode) {

this.passiveMode = passiveMode;
}

public int getThreadNum() {

return threadNum;
}

public void setThreadNum(int threadNum) {

this.threadNum = threadNum;
}

public int getTransferFileType() {

return transferFileType;
}

public void setTransferFileType(int transferFileType) {

this.transferFileType = transferFileType;
}

public boolean isRenameUploaded() {

return renameUploaded;
}

public void setRenameUploaded(boolean renameUploaded) {

this.renameUploaded = renameUploaded;
}

public int getRetryTimes() {

return retryTimes;
}

public void setRetryTimes(int retryTimes) {

this.retryTimes = retryTimes;
}

@Override
public String toString() {
return "{\"host\":\"" + host + "\", \"port\":\"" + port + "\", \"username\":\"" + username
+ "\", \"password\":\"" + password + "\", \"connectTimeOut\":\"" + connectTimeOut
+ "\", \"controlEncoding\":\"" + controlEncoding + "\", \"bufferSize\":\"" + bufferSize
+ "\", \"fileType\":\"" + fileType + "\", \"dataTimeout\":\"" + dataTimeout
+ "\", \"useEPSVwithIPv4\":\"" + useEPSVwithIPv4 + "\", \"passiveMode\":\"" + passiveMode
+ "\", \"threadNum\":\"" + threadNum + "\", \"transferFileType\":\"" + transferFileType
+ "\", \"renameUploaded\":\"" + renameUploaded + "\", \"retryTimes\":\"" + retryTimes + "\"}";
}

}
3.2 建立FTPClientFactory

​​ 在commons-pool2中有兩種工廠:PooledObjectFactory 和KeyedPooledObjectFactory,在此使用PooledObjectFactory。


/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.tompai.ftp.pool;

import java.io.IOException;

import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @desc: demo
* @name: FTPClientFactory.java
* @author: tompai
* @email:[email protected]
* @createTime: 2019年12月31日 下午12:52:02
* @history:
* @version: v1.0
*/

public class FTPClientFactory extends BasePooledObjectFactory<FTPClient> {

private static Logger logger = LoggerFactory.getLogger(FTPClientFactory.class);
private FTPClientConfig config;

public FTPClientConfig getFtpPoolConfig() {
return config;
}

public void setFtpPoolConfig(FTPClientConfig ftpPoolConfig) {
this.config = ftpPoolConfig;
}

/**
* 新建物件
*/
@Override
public FTPClient create() throws Exception {
FTPClient ftpClient = new FTPClient();
ftpClient.setConnectTimeout(config.getConnectTimeOut());
try {
logger.info("連線ftp伺服器:" + config.getHost() + ":" + config.getPort());
ftpClient.connect(config.getHost(), config.getPort());
int reply = ftpClient.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftpClient.disconnect();
logger.error("FTPServer 拒絕連線!");
return null;
}
boolean result = ftpClient.login(config.getUsername(), config.getPassword());
if (!result) {
logger.error("ftpClient登入失敗!");
throw new Exception(
"ftpClient登入失敗! userName:" + config.getUsername() + ", password:" + config.getPassword());
}

ftpClient.setControlEncoding(config.getControlEncoding());
ftpClient.setBufferSize(config.getBufferSize());
ftpClient.setFileType(config.getFileType());
ftpClient.setDataTimeout(config.getDataTimeout());
ftpClient.setUseEPSVwithIPv4(config.isUseEPSVwithIPv4());
if (config.isPassiveMode()) {
logger.info("進入ftp被動模式");
ftpClient.enterLocalPassiveMode();// 進入被動模式
}
} catch (IOException e) {
logger.error("FTP連線失敗:", e);
}
return ftpClient;
}

@Override
public PooledObject<FTPClient> wrap(FTPClient ftpClient) {
return new DefaultPooledObject<FTPClient>(ftpClient);
}

/**
* 銷燬物件
*/
@Override
public void destroyObject(PooledObject<FTPClient> p) throws Exception {
FTPClient ftpClient = p.getObject();
if (ftpClient != null && ftpClient.isConnected()) {
ftpClient.logout();
ftpClient.disconnect();
super.destroyObject(p);
}
}

/**
* 驗證物件
*/
@Override
public boolean validateObject(PooledObject<FTPClient> p) {
FTPClient ftpClient = p.getObject();
boolean connect = false;
try {
connect = ftpClient.sendNoOp();
} catch (IOException e) {
e.printStackTrace();
}
return connect;
}

/**
* No-op.
*
* @param p ignored
*/
@Override
public void activateObject(PooledObject<FTPClient> p) throws Exception {
// The default implementation is a no-op.
}

/**
* No-op.
*
* @param p ignored
*/
@Override
public void passivateObject(PooledObject<FTPClient> p) throws Exception {
// The default implementation is a no-op.
}
}
3.3 實現FTPClientPool

​ 在commons-pool2中預設了三個可以直接使用的物件池:GenericObjectPool、GenericKeyedObjectPool和SoftReferenceObjectPool,在此使用GenericObjectPool<FTPClient>


/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.tompai.ftp.pool;

import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @desc: demo
* @name: FTPClientPool.java
* @author: tompai
* @email:[email protected]
* @createTime: 2019年12月31日 下午12:54:10
* @history:
* @version: v1.0
*/

public class FTPClientPool {

private static Logger logger =LoggerFactory.getLogger(FTPClientPool.class);
private GenericObjectPool<FTPClient> pool;
private FTPClientFactory factory;

public FTPClientPool(FTPClientFactory clientFactory) {
this.factory = clientFactory;
pool = new GenericObjectPool<FTPClient>(clientFactory, clientFactory.getFtpPoolConfig());
}

public FTPClientFactory getClientFactory() {
return factory;
}

public GenericObjectPool<FTPClient> getPool() {
return pool;
}

/**
* 借 獲取一個連線物件
*
* @return
* @throws Exception
*/
public FTPClient borrowObject() throws Exception {
FTPClient client = pool.borrowObject();
if (!client.sendNoOp()) {
// 使池中的物件無效
client.logout();
client.disconnect();
pool.invalidateObject(client);
client = factory.create();
pool.addObject();
}
return client;

}

/**
* 還 歸還一個連線物件
*
* @param ftpClient
*/
public void returnObject(FTPClient ftpClient) {

if (ftpClient != null) {
pool.returnObject(ftpClient);
}
}
}
3.4 連線池使用工具類FTPClientHelper


/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.tompai.ftp.pool;

import java.io.IOException;
import java.io.InputStream;

import org.apache.commons.io.IOUtils;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @desc: demo
* @name: FTPClientHelper.java
* @author: tompai
* @email:[email protected]
* @createTime: 2019年12月31日 下午1:14:20
* @history:
* @version: v1.0
*/

public class FTPClientHelper implements AutoCloseable {

private static Logger logger = LoggerFactory.getLogger(FTPClientHelper.class);
private FTPClientPool pool;

public void setFtpClientPool(FTPClientPool ftpClientPool) {
this.pool = ftpClientPool;
}

/**
* 列出目錄下的所有檔案
* @author: tompai
* @createTime: 2019年12月31日 下午1:56:02
* @history:
* @param pathname
* @return
* @throws Exception FTPFile[]
*/
public FTPFile[] listFiles(String pathname) {
FTPClient client = getClient();
try {
return client.listFiles(pathname);
} catch (IOException e) {
//TODO Auto-generated catch block
logger.error(e.getMessage());
e.printStackTrace();
}finally {
pool.returnObject(client);
}
return null;
}

/**
* 下載 remote檔案流
* @author: tompai
* @createTime: 2019年12月31日 下午1:52:07
* @history:
* @param remote
* @return byte[]
*/
public byte[] retrieveFileStream(String remote) {
FTPClient client = getClient();
InputStream in = null;
//byte[] result=new Byte[]
try {
in = client.retrieveFileStream(remote);
return IOUtils.toByteArray(in);
} catch (IOException e) {
logger.error(e.getMessage());
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
pool.returnObject(client);
}
return null;
}

/**
* 上傳檔案
* @author: tompai
* @createTime: 2019年12月31日 下午1:53:07
* @history:
* @param remote
* @param local
* @return
* @throws Exception boolean
*/
public boolean storeFile(String remote, InputStream local) throws Exception {
FTPClient client = getClient();
try {
return client.storeFile(remote, local);
} finally {
pool.returnObject(client);
}
}

/**
* 建立目錄 單個不可遞迴
* @author: tompai
* @createTime: 2019年12月31日 下午1:52:24
* @history:
* @param pathname
* @return
* @throws Exception boolean
*/
public boolean makeDirectory(String pathname) throws Exception {
FTPClient client = getClient();
try {
return client.makeDirectory(pathname);
} finally {
pool.returnObject(client);
}
}

/**
* 刪除目錄,單個不可遞迴
* @author: tompai
* @createTime: 2019年12月31日 下午1:52:42
* @history:
* @param pathname
* @return
* @throws Exception boolean
*/
public boolean removeDirectory(String pathname) throws Exception {
FTPClient client = getClient();
try {
return client.removeDirectory(pathname);
} finally {
pool.returnObject(client);
}
}

/**
* 刪除檔案 單個 ,不可遞迴
* @author: tompai
* @createTime: 2019年12月31日 下午1:52:54
* @history:
* @param pathname
* @return
* @throws Exception boolean
*/
public boolean deleteFile(String pathname) throws Exception {

FTPClient client = getClient();
try {
return client.deleteFile(pathname);
} finally {
pool.returnObject(client);
}
}

/**
* 獲取一個連線物件
*
* @author: tompai
* @createTime: 2019年12月31日 下午1:45:46
* @history:
* @return FTPClient
*/
private FTPClient getClient() {
FTPClient client = null;
try {
client = pool.borrowObject();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return client;
}

/**
* 釋放連線
*
* @author: tompai
* @createTime: 2019年12月31日 下午1:45:03
* @history:
* @param client void
*/
private void releaseClient(FTPClient client) {
if (client == null) {
return;
}
try {
pool.returnObject(client);
} catch (Exception e) {
logger.error("Could not return the ftpClient to the pool", e);
// destoryFtpClient
if (client.isAvailable()) {
try {
client.logout();
client.disconnect();
pool.getPool().invalidateObject(client);
} catch (Exception io) {
logger.error(io.getMessage());
}
}
}
}

@Override
public void close() throws Exception {
// TODO Auto-generated method stub
logger.info("---Resources Closed---.");
}
}
3.5 連線池測試類FTPLinkPoolTest

在此可以自己使用FileZilla Server for Windows搭建一個簡單的FTP伺服器;


/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.tompai.ftp.pool;

import org.apache.commons.net.ftp.FTPFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @desc: demo
* @name: FTPLinkPoolTest.java
* @author: tompai
* @email:[email protected]
* @createTime: 2019年12月31日 下午1:13:51
* @history:
* @version: v1.0
*/

public class FTPLinkPoolTest {

private static Logger logger =LoggerFactory.getLogger(FTPClientPool.class);
/**
* @author: tompai
* @createTime: 2019年12月31日 下午1:13:51
* @history:
* @param args void
*/

public static void main(String[] args) {
//TODO Auto-generated method stub
FTPClientConfig conf=new FTPClientConfig();
conf.setHost("127.0.0.1");
conf.setUsername("test");
conf.setPassword("test");

FTPClientFactory factory=new FTPClientFactory();
factory.setFtpPoolConfig(conf);

FTPClientPool pool=new FTPClientPool(factory);

FTPClientHelper clientHelper=new FTPClientHelper();
clientHelper.setFtpClientPool(pool);

String pathname="/0821";
FTPFile[] files=clientHelper.listFiles(pathname);
for(FTPFile file:files) {
String name=file.getName();
logger.info("name:{}",name);
}
}

}

3.6 測試結果