1. 程式人生 > 其它 >Java自定義連線池

Java自定義連線池

連線池

package net.sf.hservice_qrcode.secretkey.pool;

import java.util.Enumeration;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Vector;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sf.hservice_qrcode.secretkey.tcp.client.TcpClientProxy;

public class ConnPoolImpl implements ConnPool {
	protected transient final Logger logger = LoggerFactory.getLogger(this.getClass());

	// --------------------- 連線池 -----------------
	private int initialSize = 2; // 連線池的初始大小
	private int maxSize = 10; // 連線池最大的大小
	private int incrSize = 1; // 連線池自動增加的大小
	private long maxIdleTime = 30; // 最大空閒時間,超時釋放連線,單位:秒
	private boolean autoExpanse = false; // 啟動自動擴容
	
	// ------------------- 擴容策略 -----------------
	private int perRoundSeconds = 30; // 每輪100毫秒
	private int expansionTimeout = 100 * 4; // 擴容超時,單位: 毫秒
	private int totalWaitTimeout = 1000 * 5; // 總檯等待時間,單位:毫秒

	private ConnParam connParam;

	private boolean inited = false;

	/**
	 * 預準備連線
	 */
	private Vector<PooledConn> prepareConnections;

	/**
	 * 已登入連線
	 */
	private Vector<PooledConn> loginedConnections;

	/**
	 * 空閒連線佇列
	 */
	private Queue<PooledConn> freeConnQueue = new LinkedList<PooledConn>();
	private Lock freeConnQueueLock = new ReentrantLock();
	/**
	 * 已登入連線檢查 定時器
	 */
	private Timer timerPool_LoginedConnectionCheck;
	private long timerPool_LoginedConnectionCheck_period = 100; // 單位:毫秒
	/**
	 * 長時間空閒釋放 定時器
	 */
	private Timer timerPool_LongTimeFreeConnectionCheck;
	private long timerPool_LongTimeFreeConnectionCheck_period = 100; // 單位:毫秒

	/**
	 * 建構函式
	 */
	public ConnPoolImpl(ConnParam connectParam,ConnPoolParam connPoolParam) {
		connParam = connectParam;
		
		this.initialSize = connPoolParam.getInitialSize();
		this.maxSize = connPoolParam.getMaxSize();
		this.incrSize = connPoolParam.getIncrSize();
		this.maxIdleTime = connPoolParam.getMaxIdleTime();
		this.autoExpanse = connPoolParam.isAutoExpanse();
		
		logger.info(String.format("當前處理器核數: %d" , Runtime.getRuntime().availableProcessors()));
		logger.info(String.format("連線池引數: %s, 連線引數: %s", connPoolParam.toString(), connParam.toString()));
		
	}

	/**
	 * 建立連線池
	 */
	@Override
	public void createPool() {
		if (inited) {
			logger.warn("連線池已經初始化,請勿多次初始化");
			return;
		}
		this.prepareConnections = new Vector<PooledConn>();
		this.loginedConnections = new Vector<PooledConn>();

		// 建立初始連線
		createPrepareConn(this.initialSize);

		// -------------- 登入連線檢測 ---------
		this.timerPool_LoginedConnectionCheck = new Timer();

		long firstTime = 1000 * 5; // 延時30秒啟動

		timerPool_LoginedConnectionCheck.schedule(new TimerTask() {
			@Override
			public void run() {
				loginedConnectionsCheck_ThreadRun();
			}
		}, firstTime, timerPool_LoginedConnectionCheck_period);

		// -------------- 閒置連線檢測 ---------
		timerPool_LongTimeFreeConnectionCheck = new Timer();
		firstTime = 1000 * 5; // 延時30秒啟動
		timerPool_LongTimeFreeConnectionCheck.schedule(new TimerTask() {

			@Override
			public void run() {
				// 釋放空閒連線
				longTimeFreeConnectionCheck_ThreadRun();
			}
		}, firstTime, this.timerPool_LongTimeFreeConnectionCheck_period);

		inited = true;
	}

	/**
	 * 建立連線
	 * 
	 * @param count
	 */
	private synchronized void createPrepareConn(int count) {
		for (int x = 0; x < count; x++) {
			if (this.prepareConnections.size() >= this.maxSize) {
				logger.info("[pool]---> 連線池中連線數已到達極限[" + this.maxSize + "],不再擴容");
				break;
			}

			TcpClientProxy conn = newConn();
			PooledConn pooledConn = new PooledConn(conn);
			pooledConn.setBusy(false);
			pooledConn.setPooled(false);
			pooledConn.setQueue(false);
			pooledConn.setLogined(false);

			this.prepareConnections.addElement(pooledConn);

			PoolReturnValue poolReturnValue = getPoolListString(this.prepareConnections);

			logger.info("[pool]---> 預分配連線池新增連線,當前連線池中連線個數=" + poolReturnValue.getIntVal() + ", 當前連線池["
					+ poolReturnValue.getStrVal() + "]");

		}
	}

	/**
	 * 建立一個新的連線並返回
	 * 
	 * @return
	 */
	private synchronized TcpClientProxy newConn() {
		TcpClientProxy conn = null;
		String clientName = String.format("%s-%02d", connParam.getClientName(), this.prepareConnections.size());

		conn = new TcpClientProxy();
		conn.SetParam(clientName, connParam.getHost(), connParam.getPort(), connParam.getTermNo(),
				connParam.getTermMasterKey(), connParam.getConnet_timeout(), connParam.getRecv_timeout());
		conn.start();

		return conn; // 返回建立的新的資料庫連線
	}

	/**
	 * 返回一個可用連線
	 * 
	 * @return
	 */
	@Override
	public PooledConn getConnection() {
		boolean bNeedNewConn = false;
		boolean isExpansed = false;

		// 確保連線池己被建立
		if (!inited) {
			logger.warn("連線池未初始化完成,返回可用連線null");
			return null; // 連線池還沒建立,則返回 null
		}
		PooledConn conn = getFreeConn(); // 獲得一個可用的資料庫連線如果目前沒有可以使用的連線,即所有的連線都在使用中

		int tryCount = 0;

		while (conn == null) {
			// ------------- 自動擴容 ---------------
			if (this.autoExpanse && !isExpansed) {
				// 等待時間大於擴容超時,進行擴容
				if (perRoundSeconds * tryCount > expansionTimeout) {
					bNeedNewConn = true;

					if (bNeedNewConn) {
						logger.info(String.format("[pool]---> 等待超過%dms, 觸發連線池擴容策略,進行擴容", perRoundSeconds * tryCount));
						expansePool();
						isExpansed = true;
					}
				}
			}

//			// 等待時間大於最大等待時間,返回失敗
//			if (perRoundSeconds * tryCount > totalWaitTimeout) {
//				logger.info(String.format("[pool]---> 等待%ds,長時間未分到可用的連線,連線分配失敗返回連線null",
//						perRoundSeconds * tryCount / 1000));
//				break;
//			}

			tryCount++;
			wait(perRoundSeconds);
			conn = getFreeConn(); // 重新再試,直到獲得可用的連線,如果則表明建立一批連線後也不可獲得可用連線
			if (conn != null) {
				logger.info(String.format("[pool]---> 等待%dms, 連線分配成功", perRoundSeconds * tryCount));
				break;
			} else {
				logger.debug(String.format("[pool]---> 等待%dms未分配到可用連線,繼續等待...", perRoundSeconds * tryCount));
			}
		}

		return conn;// 返回獲得的可用的連線
	}

	private void expansePool() {
		// 如果目前連線池中沒有可用的連線 建立一些連線
		createPrepareConn(this.incrSize);
	}

	/**
	 * 返回可用的連線
	 * 
	 * @return
	 */
	private synchronized PooledConn getFreeConn() {
		PooledConn pConn = null;
		pConn = freeConnQueue.poll();
		return pConn;// 返回找到到的可用連線
	}

	@Override
	public synchronized void closeConnection(PooledConn pooledConn) {
		freeConnQueue.offer(pooledConn);
	}

	/**
	 * 關閉連線
	 * 
	 * @param conn
	 */
	private void closeConn(TcpClientProxy conn) {
		conn.close();
	}

	/**
	 * 等待 單位 毫秒
	 * 
	 * @param mSeconds
	 */
	private void wait(int mSeconds) {
		try {
			Thread.sleep(mSeconds);
		} catch (InterruptedException e) {
		}
	}

	/**
	 * 釋放空閒連線
	 * 
	 */
	private void longTimeFreeConnectionCheck_ThreadRun() {
		// 確保連線池己創新存在
		PooledConn pConn = null;
		boolean bFind = false;

		Enumeration enumerate = this.loginedConnections.elements();
		while (enumerate.hasMoreElements()) {
			if (this.loginedConnections.size() <= this.initialSize) {
				break;
			}
			pConn = (PooledConn) enumerate.nextElement();
			long intIdleSeconds = pConn.getIdleSeconds();
			// 大於最大空閒時間
			if (intIdleSeconds > this.maxIdleTime * 1000) {
				// 從連線池向量中刪除它
				this.loginedConnections.removeElement(pConn);
				logger.info("[pool]---> 已登入連線池移除連線[" + pConn.getConn().getClientName() + "]");

				this.prepareConnections.remove(pConn);
				logger.info("[pool]---> 預分配連線池移除連線[" + pConn.getConn().getClientName() + "]");

				closeConn(pConn.getConn());

				bFind = true;
				PoolReturnValue poolReturnValue = getPoolListString(this.loginedConnections);

				logger.info("[pool]---> 超過最大空閒時間" + intIdleSeconds / 1000 + "s釋放空閒連線[" + pConn.getConn().getClientName()
						+ "],釋放後連線個數=" + poolReturnValue.getIntVal() + ", 當前連線池[" + poolReturnValue.getStrVal() + "]");
			}
		}

		// ------------ 存在釋放閒置連線的,才需要重新整理佇列 --------------
		if (bFind) {
			freeConnQueueLock.lock();
			freeConnQueue.clear();
			enumerate = this.loginedConnections.elements();
			while (enumerate.hasMoreElements()) {
				pConn = (PooledConn) enumerate.nextElement();
				freeConnQueue.offer(pConn);
			}

			logger.info("[pool]--<queue>---> 空閒連線佇列重新排列,佇列中連線個數:" + freeConnQueue.size() + ", 當前空閒連線佇列["
					+ getFreeQueueListString(freeConnQueue) + "]");
			freeConnQueueLock.unlock();
		}
	}

	private void freeQueue_append(PooledConn pConn) {
		this.freeConnQueue.offer(pConn);

		logger.info("[pool]--<queue>---> 空閒連線佇列,連線個數:" + freeConnQueue.size() + ", 佇列["
				+ getFreeQueueListString(this.freeConnQueue) + "]");
	}

	private void loginedConnectionsCheck_ThreadRun() {
		// 簽到完成的連線,加入到已登入連線池
		Enumeration enumerate = this.prepareConnections.elements();
		while (enumerate.hasMoreElements()) {
			PooledConn pConn = (PooledConn) enumerate.nextElement();
			if (pConn.isPooled() == false) {
				while (pConn.getConn().getLoginFlag() != 2) {
					wait(300);
				}

				this.loginedConnections.add(pConn);
				pConn.setPooled(true);

				PoolReturnValue poolReturnValue = getPoolListString(this.loginedConnections);
				logger.info("[pool]---> 連線池中增加一個新的連線,當前連線數量=" + poolReturnValue.getIntVal() + ", 當前連線池["
						+ poolReturnValue.getStrVal() + "]");

				freeQueue_append(pConn); // 加入佇列
			}
		}
	}

	private PoolReturnValue getPoolListString(Vector<PooledConn> connections) {
		PoolReturnValue ret = new PoolReturnValue();
		int count = 0;

		String stroutString = "";
		Enumeration enumerate = connections.elements();
		while (enumerate.hasMoreElements()) {
			PooledConn pConn = (PooledConn) enumerate.nextElement();
			stroutString += "|" + pConn.getConn().getClientName();
			count++;
		}

		ret.setStrVal(stroutString);
		ret.setIntVal(count);
		return ret;
	}

	private String getFreeQueueListString(Queue<PooledConn> freeConnQueueC) {
		String stroutString = "";

		LinkedList linkedList = (LinkedList) freeConnQueueC;
		for (Iterator iterator = linkedList.iterator(); iterator.hasNext();) {
			PooledConn pConn = (PooledConn) iterator.next();
			stroutString += "|" + pConn.getConn().getClientName();
		}

		return stroutString;
	}

}

連線

package net.sf.hservice_qrcode.secretkey.pool;

import java.util.Date;

import net.sf.hservice_qrcode.secretkey.tcp.client.TcpClientProxy;

/**
 * 
 * 內部使用的用於儲存連線池中連線物件的類 此類中有兩個成員,一個是資料庫的連線,另一個是指示此連線是否 正在使用的標誌。
 */
public class PooledConn {

	private TcpClientProxy conn = null;// 連線
	private boolean busy = false; // 此連線是否正在使用的標誌,預設沒有正在使用

	private Date lastActiveDate;
	private boolean isQueue;
	private boolean logined;
	private boolean pooled;
	
	public boolean isPooled() {
		return pooled;
	}

	public void setPooled(boolean pooled) {
		this.pooled = pooled;
	}

	public boolean isQueue() {
		return isQueue;
	}

	public void setQueue(boolean isQueue) {
		this.isQueue = isQueue;
	}

	public Date getLastActiveDate() {
		return lastActiveDate;
	}

	public void setLastActiveDate(Date lastActiveDate) {
		this.lastActiveDate = lastActiveDate;
	}

	/**
	 * 空閒時間,單位:秒
	 */
	public long getIdleSeconds() {
		long idleSeconds = 0;
		long curTime = new Date().getTime();
		idleSeconds = curTime - this.lastActiveDate.getTime();

		return idleSeconds;
	}

	// 建構函式,根據一個 Connection 構告一個 PooledConnection 物件
	public PooledConn(TcpClientProxy connection) {
		this.conn = connection;
		lastActiveDate = new Date();
		this.isQueue = false;
	}

	public TcpClientProxy getConn() {
		return conn;
	}

	public void setConn(TcpClientProxy conn) {
		this.conn = conn;
	}

	// 獲得物件連線是否忙
	public boolean isBusy() {
		return busy;
	}

	// 設定物件的連線正在忙
	public void setBusy(boolean busy) {
		this.busy = busy;
		lastActiveDate = new Date();
	}

	public boolean isLogined() {
		return logined;
	}

	public void setLogined(boolean logined) {
		this.logined = logined;
	}
}