Java自定義連線池
阿新 • • 發佈:2021-06-22
連線池
package net.sf.hservice_qrcode.secretkey.pool; import java.util.Enumeration; import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; import java.util.Timer; import java.util.TimerTask; import java.util.Vector; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import sf.hservice_qrcode.secretkey.tcp.client.TcpClientProxy; public class ConnPoolImpl implements ConnPool { protected transient final Logger logger = LoggerFactory.getLogger(this.getClass()); // --------------------- 連線池 ----------------- private int initialSize = 2; // 連線池的初始大小 private int maxSize = 10; // 連線池最大的大小 private int incrSize = 1; // 連線池自動增加的大小 private long maxIdleTime = 30; // 最大空閒時間,超時釋放連線,單位:秒 private boolean autoExpanse = false; // 啟動自動擴容 // ------------------- 擴容策略 ----------------- private int perRoundSeconds = 30; // 每輪100毫秒 private int expansionTimeout = 100 * 4; // 擴容超時,單位: 毫秒 private int totalWaitTimeout = 1000 * 5; // 總檯等待時間,單位:毫秒 private ConnParam connParam; private boolean inited = false; /** * 預準備連線 */ private Vector<PooledConn> prepareConnections; /** * 已登入連線 */ private Vector<PooledConn> loginedConnections; /** * 空閒連線佇列 */ private Queue<PooledConn> freeConnQueue = new LinkedList<PooledConn>(); private Lock freeConnQueueLock = new ReentrantLock(); /** * 已登入連線檢查 定時器 */ private Timer timerPool_LoginedConnectionCheck; private long timerPool_LoginedConnectionCheck_period = 100; // 單位:毫秒 /** * 長時間空閒釋放 定時器 */ private Timer timerPool_LongTimeFreeConnectionCheck; private long timerPool_LongTimeFreeConnectionCheck_period = 100; // 單位:毫秒 /** * 建構函式 */ public ConnPoolImpl(ConnParam connectParam,ConnPoolParam connPoolParam) { connParam = connectParam; this.initialSize = connPoolParam.getInitialSize(); this.maxSize = connPoolParam.getMaxSize(); this.incrSize = connPoolParam.getIncrSize(); this.maxIdleTime = connPoolParam.getMaxIdleTime(); this.autoExpanse = connPoolParam.isAutoExpanse(); logger.info(String.format("當前處理器核數: %d" , Runtime.getRuntime().availableProcessors())); logger.info(String.format("連線池引數: %s, 連線引數: %s", connPoolParam.toString(), connParam.toString())); } /** * 建立連線池 */ @Override public void createPool() { if (inited) { logger.warn("連線池已經初始化,請勿多次初始化"); return; } this.prepareConnections = new Vector<PooledConn>(); this.loginedConnections = new Vector<PooledConn>(); // 建立初始連線 createPrepareConn(this.initialSize); // -------------- 登入連線檢測 --------- this.timerPool_LoginedConnectionCheck = new Timer(); long firstTime = 1000 * 5; // 延時30秒啟動 timerPool_LoginedConnectionCheck.schedule(new TimerTask() { @Override public void run() { loginedConnectionsCheck_ThreadRun(); } }, firstTime, timerPool_LoginedConnectionCheck_period); // -------------- 閒置連線檢測 --------- timerPool_LongTimeFreeConnectionCheck = new Timer(); firstTime = 1000 * 5; // 延時30秒啟動 timerPool_LongTimeFreeConnectionCheck.schedule(new TimerTask() { @Override public void run() { // 釋放空閒連線 longTimeFreeConnectionCheck_ThreadRun(); } }, firstTime, this.timerPool_LongTimeFreeConnectionCheck_period); inited = true; } /** * 建立連線 * * @param count */ private synchronized void createPrepareConn(int count) { for (int x = 0; x < count; x++) { if (this.prepareConnections.size() >= this.maxSize) { logger.info("[pool]---> 連線池中連線數已到達極限[" + this.maxSize + "],不再擴容"); break; } TcpClientProxy conn = newConn(); PooledConn pooledConn = new PooledConn(conn); pooledConn.setBusy(false); pooledConn.setPooled(false); pooledConn.setQueue(false); pooledConn.setLogined(false); this.prepareConnections.addElement(pooledConn); PoolReturnValue poolReturnValue = getPoolListString(this.prepareConnections); logger.info("[pool]---> 預分配連線池新增連線,當前連線池中連線個數=" + poolReturnValue.getIntVal() + ", 當前連線池[" + poolReturnValue.getStrVal() + "]"); } } /** * 建立一個新的連線並返回 * * @return */ private synchronized TcpClientProxy newConn() { TcpClientProxy conn = null; String clientName = String.format("%s-%02d", connParam.getClientName(), this.prepareConnections.size()); conn = new TcpClientProxy(); conn.SetParam(clientName, connParam.getHost(), connParam.getPort(), connParam.getTermNo(), connParam.getTermMasterKey(), connParam.getConnet_timeout(), connParam.getRecv_timeout()); conn.start(); return conn; // 返回建立的新的資料庫連線 } /** * 返回一個可用連線 * * @return */ @Override public PooledConn getConnection() { boolean bNeedNewConn = false; boolean isExpansed = false; // 確保連線池己被建立 if (!inited) { logger.warn("連線池未初始化完成,返回可用連線null"); return null; // 連線池還沒建立,則返回 null } PooledConn conn = getFreeConn(); // 獲得一個可用的資料庫連線如果目前沒有可以使用的連線,即所有的連線都在使用中 int tryCount = 0; while (conn == null) { // ------------- 自動擴容 --------------- if (this.autoExpanse && !isExpansed) { // 等待時間大於擴容超時,進行擴容 if (perRoundSeconds * tryCount > expansionTimeout) { bNeedNewConn = true; if (bNeedNewConn) { logger.info(String.format("[pool]---> 等待超過%dms, 觸發連線池擴容策略,進行擴容", perRoundSeconds * tryCount)); expansePool(); isExpansed = true; } } } // // 等待時間大於最大等待時間,返回失敗 // if (perRoundSeconds * tryCount > totalWaitTimeout) { // logger.info(String.format("[pool]---> 等待%ds,長時間未分到可用的連線,連線分配失敗返回連線null", // perRoundSeconds * tryCount / 1000)); // break; // } tryCount++; wait(perRoundSeconds); conn = getFreeConn(); // 重新再試,直到獲得可用的連線,如果則表明建立一批連線後也不可獲得可用連線 if (conn != null) { logger.info(String.format("[pool]---> 等待%dms, 連線分配成功", perRoundSeconds * tryCount)); break; } else { logger.debug(String.format("[pool]---> 等待%dms未分配到可用連線,繼續等待...", perRoundSeconds * tryCount)); } } return conn;// 返回獲得的可用的連線 } private void expansePool() { // 如果目前連線池中沒有可用的連線 建立一些連線 createPrepareConn(this.incrSize); } /** * 返回可用的連線 * * @return */ private synchronized PooledConn getFreeConn() { PooledConn pConn = null; pConn = freeConnQueue.poll(); return pConn;// 返回找到到的可用連線 } @Override public synchronized void closeConnection(PooledConn pooledConn) { freeConnQueue.offer(pooledConn); } /** * 關閉連線 * * @param conn */ private void closeConn(TcpClientProxy conn) { conn.close(); } /** * 等待 單位 毫秒 * * @param mSeconds */ private void wait(int mSeconds) { try { Thread.sleep(mSeconds); } catch (InterruptedException e) { } } /** * 釋放空閒連線 * */ private void longTimeFreeConnectionCheck_ThreadRun() { // 確保連線池己創新存在 PooledConn pConn = null; boolean bFind = false; Enumeration enumerate = this.loginedConnections.elements(); while (enumerate.hasMoreElements()) { if (this.loginedConnections.size() <= this.initialSize) { break; } pConn = (PooledConn) enumerate.nextElement(); long intIdleSeconds = pConn.getIdleSeconds(); // 大於最大空閒時間 if (intIdleSeconds > this.maxIdleTime * 1000) { // 從連線池向量中刪除它 this.loginedConnections.removeElement(pConn); logger.info("[pool]---> 已登入連線池移除連線[" + pConn.getConn().getClientName() + "]"); this.prepareConnections.remove(pConn); logger.info("[pool]---> 預分配連線池移除連線[" + pConn.getConn().getClientName() + "]"); closeConn(pConn.getConn()); bFind = true; PoolReturnValue poolReturnValue = getPoolListString(this.loginedConnections); logger.info("[pool]---> 超過最大空閒時間" + intIdleSeconds / 1000 + "s釋放空閒連線[" + pConn.getConn().getClientName() + "],釋放後連線個數=" + poolReturnValue.getIntVal() + ", 當前連線池[" + poolReturnValue.getStrVal() + "]"); } } // ------------ 存在釋放閒置連線的,才需要重新整理佇列 -------------- if (bFind) { freeConnQueueLock.lock(); freeConnQueue.clear(); enumerate = this.loginedConnections.elements(); while (enumerate.hasMoreElements()) { pConn = (PooledConn) enumerate.nextElement(); freeConnQueue.offer(pConn); } logger.info("[pool]--<queue>---> 空閒連線佇列重新排列,佇列中連線個數:" + freeConnQueue.size() + ", 當前空閒連線佇列[" + getFreeQueueListString(freeConnQueue) + "]"); freeConnQueueLock.unlock(); } } private void freeQueue_append(PooledConn pConn) { this.freeConnQueue.offer(pConn); logger.info("[pool]--<queue>---> 空閒連線佇列,連線個數:" + freeConnQueue.size() + ", 佇列[" + getFreeQueueListString(this.freeConnQueue) + "]"); } private void loginedConnectionsCheck_ThreadRun() { // 簽到完成的連線,加入到已登入連線池 Enumeration enumerate = this.prepareConnections.elements(); while (enumerate.hasMoreElements()) { PooledConn pConn = (PooledConn) enumerate.nextElement(); if (pConn.isPooled() == false) { while (pConn.getConn().getLoginFlag() != 2) { wait(300); } this.loginedConnections.add(pConn); pConn.setPooled(true); PoolReturnValue poolReturnValue = getPoolListString(this.loginedConnections); logger.info("[pool]---> 連線池中增加一個新的連線,當前連線數量=" + poolReturnValue.getIntVal() + ", 當前連線池[" + poolReturnValue.getStrVal() + "]"); freeQueue_append(pConn); // 加入佇列 } } } private PoolReturnValue getPoolListString(Vector<PooledConn> connections) { PoolReturnValue ret = new PoolReturnValue(); int count = 0; String stroutString = ""; Enumeration enumerate = connections.elements(); while (enumerate.hasMoreElements()) { PooledConn pConn = (PooledConn) enumerate.nextElement(); stroutString += "|" + pConn.getConn().getClientName(); count++; } ret.setStrVal(stroutString); ret.setIntVal(count); return ret; } private String getFreeQueueListString(Queue<PooledConn> freeConnQueueC) { String stroutString = ""; LinkedList linkedList = (LinkedList) freeConnQueueC; for (Iterator iterator = linkedList.iterator(); iterator.hasNext();) { PooledConn pConn = (PooledConn) iterator.next(); stroutString += "|" + pConn.getConn().getClientName(); } return stroutString; } }
連線
package net.sf.hservice_qrcode.secretkey.pool; import java.util.Date; import net.sf.hservice_qrcode.secretkey.tcp.client.TcpClientProxy; /** * * 內部使用的用於儲存連線池中連線物件的類 此類中有兩個成員,一個是資料庫的連線,另一個是指示此連線是否 正在使用的標誌。 */ public class PooledConn { private TcpClientProxy conn = null;// 連線 private boolean busy = false; // 此連線是否正在使用的標誌,預設沒有正在使用 private Date lastActiveDate; private boolean isQueue; private boolean logined; private boolean pooled; public boolean isPooled() { return pooled; } public void setPooled(boolean pooled) { this.pooled = pooled; } public boolean isQueue() { return isQueue; } public void setQueue(boolean isQueue) { this.isQueue = isQueue; } public Date getLastActiveDate() { return lastActiveDate; } public void setLastActiveDate(Date lastActiveDate) { this.lastActiveDate = lastActiveDate; } /** * 空閒時間,單位:秒 */ public long getIdleSeconds() { long idleSeconds = 0; long curTime = new Date().getTime(); idleSeconds = curTime - this.lastActiveDate.getTime(); return idleSeconds; } // 建構函式,根據一個 Connection 構告一個 PooledConnection 物件 public PooledConn(TcpClientProxy connection) { this.conn = connection; lastActiveDate = new Date(); this.isQueue = false; } public TcpClientProxy getConn() { return conn; } public void setConn(TcpClientProxy conn) { this.conn = conn; } // 獲得物件連線是否忙 public boolean isBusy() { return busy; } // 設定物件的連線正在忙 public void setBusy(boolean busy) { this.busy = busy; lastActiveDate = new Date(); } public boolean isLogined() { return logined; } public void setLogined(boolean logined) { this.logined = logined; } }