java之實現自己的資料庫連線池
阿新 • • 發佈:2019-01-04
最近仿mybatis寫了一個自己的orm框架 專案已上傳到github上 https://github.com/skybluehhx/MYORM.git,既然是orm框架肯定需要事務管理器和資料庫連線池,下面將介紹我自己實現一個連線池 (主要藉助阻塞佇列)
首先定義一個介面,給出執行緒池的基本功能
package Pools; import java.sql.Connection; /** * Created by zoujianglin * 2018/8/25 0025. */ public interface Pool { //獲取連線池中的“連線” PoolConnection getPoolConnection(); boolean relasePoolConnection(PoolConnection connection); //連線池大小 int getPoolSize(); //銷燬連線池 boolean destroy(); //釋放連線,歸還連線,使用該方法並不是銷燬連線而是 //重新歸回執行緒池,共其他執行緒複用 boolean relaseConnection(Connection connection); }
在這裡我將連線池中的連線做了一個抽象,主要是方便後續擴充套件,如標定每個連線所屬的事務管理器等等
具體定義如下
package Pools; import java.sql.Connection; import java.util.concurrent.atomic.AtomicLong; /** * Created by zoujianglin * 2018/8/25 0025. * PoolConnection為連線池對連線的抽象 */ public class PoolConnection { //表示連線的唯一標識, private long id = 0; private AtomicLong Iid = new AtomicLong(0); // 維持著資料庫連線 private Connection connection; //標誌著該連線是否被其他執行緒使用 // false表示連線不可用,表明連線已經被佔用 //true表示該連線未被佔用,可以正常使用 private volatile boolean isAvailable; //管理該連線的事務管理器,只有從事務管理器中獲取的連線才會被設定 暫未使用 // private TransactionManage transactionManage; //建立時,不帶引數預設沒有被使用 public PoolConnection(Connection connection) { this(connection, true); } public PoolConnection(Connection connection, boolean isAvailable) { this.connection = connection; this.isAvailable = isAvailable; this.id = Iid.getAndIncrement(); } @Override public String toString() { return "PoolConnection{" + "connection=" + connection + ", isAvailable=" + isAvailable + "connection=" + id + '}'; } public long getId() { return id; } public Connection getConnection() { return connection; } public boolean isAvailable() { return isAvailable; } }
//接著是basePoolt提供了對連線池的基本實現 該類有點複雜,比較重要的方法一般都做了註釋,具體實現如下
package Pools; import ORMException.DestoryPoolException; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.sql.Connection; import java.sql.Driver; import java.sql.DriverManager; import java.sql.SQLException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * Created by zoujianglin * 2018/8/25 0025. * 提供連線池的基本功能 */ @Component public abstract class BasePool implements Pool { private static Logger logger = Logger.getLogger(BasePool.class); //資料來源,獲取連線資料庫的基本資訊 @Autowired private DataSource dataSource; //用來持有連線池連線 private final BlockingQueue<PoolConnection> blockingQueue; //資料庫持有連線數量,初始值為0; private AtomicInteger poolSize = new AtomicInteger(0); //擴容標誌,當為true時,表明連線池正在擴容, //一個執行緒池我們設定只有一個執行緒進行擴容,後面想了想多個執行緒同時擴容也沒事,所以註釋掉 //private AtomicBoolean isDilatation = new AtomicBoolean(false); //初始化標誌 只能被初始化一次 private AtomicBoolean isInitialization; //銷燬標誌 private AtomicBoolean isDestroy = new AtomicBoolean(false); //預設重試次數,當資料庫獲取連線失敗時,將會重試,預設 //重試次數為連線池規定連線數,當為0時將不會重試, private int tryTimes; //增長的步伐,從連線池中獲取連線時,如果沒有 //獲取到連線而連線數小於最大連線時將以該步長增長連線數 //預設步長為4 private int step; public BasePool(DataSource dataSource, BlockingQueue blockingQueue) { this(dataSource, blockingQueue, dataSource.getMinConnection(), 4); } public BasePool(DataSource dataSource, BlockingQueue blockingQueue, int step) { this(dataSource, blockingQueue, dataSource.getMinConnection(), step); } public BasePool(DataSource dataSource, BlockingQueue<PoolConnection> blockingQueue, int times, int step) { this.dataSource = dataSource; this.blockingQueue = blockingQueue; this.tryTimes = times; this.isInitialization = new AtomicBoolean(false); this.step = step; init(); } /** public PoolConnection getConnection(long timeout, TimeUnit unit) { return null; } **/ /** * 獲取連線,獲取連線時並不能保證當連線池尺寸小於設定的最大連線數時 * 能立馬獲取到連線,但能保證最終獲取連線 * * @return */ public PoolConnection getPoolConnection() { if (!isDestroy.get()) { try { PoolConnection poolConnection = blockingQueue.poll(); if (poolConnection == null) { //判斷執行緒池大小是否達到最大 if (poolSize.get() >= dataSource.getMaxConnection()) {//達到最大阻塞等待 poolConnection = blockingQueue.take(); } else { //if (isDilatation.compareAndSet(false, true)) { //進行判斷,並確保每次只有一個執行緒進行擴增操作 int currentSize; int afterSize; int newSize; while (true) { currentSize = poolSize.get(); afterSize = currentSize + step; newSize = afterSize > dataSource.getMaxConnection() ? dataSource.getMaxConnection() : afterSize; int addNum = newSize - currentSize; //確保只有一個執行緒進行擴容執行緒池 if (poolSize.compareAndSet(currentSize, newSize)) { for (int i = 0; i < addNum; i++) { //這裡為提高效率可以返回第一個PoolConnecton,其他連線操作可以交由另一個執行緒操作 blockingQueue.add(getNewOneConnection(dataSource.getUrl(), dataSource.getUserName(), dataSource.getPassword())); } break; } //isDilatation.set(false);看前面關於isDilatation屬性的介紹,後發現多執行緒擴容也沒事 } return blockingQueue.take(); //} else { //有執行緒在擴容,直接阻塞等待返回 // return blockingQueue.take(); //} } } return poolConnection; } catch (InterruptedException e) { e.printStackTrace(); throw new RuntimeException(e); } } throw new DestoryPoolException("連線池已被毀壞"); } public int getPoolSize() { return poolSize.get(); } public int getFreeConnectioNums() { return blockingQueue.size(); } /** * * 歸還連線, * * @param connection * @return */ public boolean relasePoolConnection(PoolConnection connection) { blockingQueue.add(connection); return true; } //釋放連線,正如你所料的 我們依靠poolSize來確保連線池大小 //所以在釋放連線失敗時,poolSize的值應該減一,為了避免被錯誤的 //使用,我們增加限制,傳入空值時,將會丟擲異常,這裡並沒有 //做強制的保證,如果使用者歸還的連線不是連線池中的連線 我們也會 //確保它歸還成功,錯誤的使用該方法該會造成連線池實際數量 //大於現有數量,其實可以使用一個ThrealLocal 儲存一個標誌 //當執行緒獲取連線時,將標誌設定為true,只有帶有標誌的執行緒才能歸還 //並且歸還後需要重新置為false,考慮到該執行緒池,是內建使用, //這裡並沒有實現該功能,如果需求特殊,後續考慮補加 public boolean relaseConnection(Connection connection) { if (connection == null) { throw new RuntimeException("請確保釋放的連線不為空"); } //在歸還前 確保歸還的連線可用, boolean falg = false; //poolSize 減一是否成功的標誌 try { if (connection.isClosed()) { //連線已關閉,但連線池大小小於規定最小數目 //連線釋放失敗,直接尺寸減一 poolSize.getAndDecrement(); falg = true; return false; } } catch (SQLException e) { e.printStackTrace(); } finally { if (!falg) { poolSize.getAndDecrement(); } } return relasePoolConnection(new PoolConnection(connection)); } public boolean destroy() { return false; } public void init() { //沒有被初始化才進行初始化 if (isInitialization.compareAndSet(false, true)) { logger.error("開始初始化執行緒池"); try { Class driver = Class.forName(dataSource.getDriverClassName()); DriverManager.registerDriver((Driver) driver.newInstance()); } catch (ClassNotFoundException e) { e.printStackTrace(); throw new RuntimeException(e + "資料庫驅動類錯誤"); } catch (SQLException e) { e.printStackTrace(); throw new RuntimeException(e + "註冊資料庫驅動失敗"); } catch (IllegalAccessException e) { e.printStackTrace(); throw new RuntimeException(e + "註冊資料庫驅動失敗"); } catch (InstantiationException e) { e.printStackTrace(); throw new RuntimeException(e + "註冊資料庫驅動失敗"); } String url = dataSource.getUrl(); String userName = dataSource.getUserName(); String password = dataSource.getPassword(); for (int i = 0; i < dataSource.getMinConnection(); i++) { try { Connection connection = DriverManager.getConnection(url, userName, password); blockingQueue.add(new PoolConnection(connection)); poolSize.getAndIncrement(); //將連線放入阻塞佇列 } catch (SQLException e) { logger.error("獲取一條資料庫連線失敗", e); //補入重試機制,確保資料庫連線能夠完成 while (tryTimes > 0) { try { Connection connection = DriverManager.getConnection(url, userName, password); blockingQueue.add(new PoolConnection(connection)); poolSize.getAndIncrement(); } catch (SQLException e1) { e1.printStackTrace(); logger.error("重試時獲取一條資料庫連線失敗", e); } tryTimes--; logger.error("重試次數剩餘" + tryTimes, e); } } } } else { logger.warn("執行緒池正在被或已經被初始化,一個執行緒池只能被初始化一次"); throw new RuntimeException("請確保執行緒池只被初始化一次"); } } private PoolConnection getNewOneConnection(String url, String userName, String password) { try { Connection connection = DriverManager.getConnection(url, userName, password); return new PoolConnection(connection); } catch (SQLException e) { e.printStackTrace(); while (tryTimes > 0) { //重試 try { Connection connection = DriverManager.getConnection(url, userName, password); return new PoolConnection(connection); } catch (SQLException e1) { e1.printStackTrace(); logger.error("重試時獲取一條資料庫連線失敗", e); } tryTimes--; logger.error("重試次數剩餘" + tryTimes, e); } throw new RuntimeException("重試次數用光,獲取連線失敗"); } } }
其中該類中涉及到的資料來源結構如下
package Pools; /** * Created by zoujianglin * 2018/8/25 0025. * <p> * 資料來源 保留著資料庫連線的基本配置 */ public class DataSource { private String driverClassName; private String userName; private String password; private String url; private int maxConnection = 10; private int minConnection = 20; private int timeout; public DataSource(){ } protected DataSource(String driverClassName, String userName, String password, String url) { this.driverClassName = driverClassName; this.userName = userName; this.password = password; this.url = url; } public void setDriverClassName(String driverClassName) { this.driverClassName = driverClassName; } public void setUserName(String userName) { this.userName = userName; } public void setPassword(String password) { this.password = password; } public void setUrl(String url) { this.url = url; } public String getDriverClassName() { return driverClassName; } public String getUserName() { return userName; } public String getPassword() { return password; } public String getUrl() { return url; } public int getMaxConnection() { return maxConnection; } public void setMaxConnection(int maxConnection) { this.maxConnection = maxConnection; } public int getMinConnection() { return minConnection; } public void setMinConnection(int minConnection) { this.minConnection = minConnection; } public int getTimeout() { return timeout; } public void setTimeout(int timeout) { this.timeout = timeout; } }
最後給出資料庫連線池的具體實現,預設連線池採用無界阻塞佇列
package Pools; import org.springframework.stereotype.Component; import java.util.concurrent.LinkedBlockingQueue; /** * Created by zoujianglin * 2018/8/25 0025. */ public class DefaultPool extends BasePool { public DefaultPool(DataSource dataSource) { super(dataSource, new LinkedBlockingQueue()); } public DefaultPool(DataSource dataSource, int times) { super(dataSource, new LinkedBlockingQueue(), times); } }