Java 之 JDBC執行緒池(原始碼版)
阿新 • • 發佈:2020-11-20
一、目錄
二、程式碼
PoolConstant
package cn.kgc.kb08.jdbc.dao3.impl; public interface PoolConstant { String POOL_CORE_COUNT="coreCount"; String POOL_MAX_COUNT="maxCount"; String POOL_MAX_IDELE="maxIdel"; String POOL_MAX_WAIT="maxWait"; String POOL_RETRY_INTERVAL="retryInterval"; String POOL_MAX_RETRY_COUNT="maxRetryCount"; String POOL_EXIT_ON_ERR="exitOnErr"; String[] POOL={ POOL_CORE_COUNT, POOL_MAX_COUNT, POOL_MAX_IDELE, POOL_MAX_WAIT, POOL_RETRY_INTERVAL, POOL_MAX_RETRY_COUNT, POOL_EXIT_ON_ERR }; String MYSQL_DRI="driver"; String MYSQL_URI="url"; String MYSQL_USER="username"; String MYSQL_PASS="password"; String[] MYSQL = { MYSQL_DRI, MYSQL_URI, MYSQL_USER, MYSQL_PASS }; }
PoolUtil
package cn.kgc.kb08.jdbc.dao3.impl; import cn.kgc.kb08.jdbc.dao2.Dao;import cn.kgc.kb08.jdbc.dao2.impl.BaseDao; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; public class PoolUtil { private static Dao dao; /** * 解析資料來源配置資訊 * @param dataSource 資料來源名稱 * @return Map<String,String> */ protected static <T>Map<String,T> parse(Class<T> c,String dataSource, List<String> items){ // File config = new File("config/sys.properties"); // Properties pro = new Properties(); // try { // pro.load(new FileInputStream(config)); // Map<String,T> map = new HashMap<>(items.size()); // for (String item : items) { // String key = dataSource+"."+ item; // if (!pro.containsKey(key)){ // throw new IOException("缺少配置專案"+item); // } // map.put(item,c.getConstructor(String.class).newInstance(pro.getProperty(key))); // } // } catch (Exception e) { // e.printStackTrace(); // System.out.println("資源配置缺失,系統強制退出"+e.getMessage()); // System.exit(-1); // }finally { // if(null!=pro){ // pro.clear(); // pro = null; // // } // } // // return null; File config = new File("config/sys.properties"); Properties pro = new Properties();//Properties是一個檔案 try { pro.load(new FileInputStream(config)); //final String[] items = {"driver", "url", "username", "password"}; Map<String,T> map = new HashMap<>(items.size()); for (String item : items) { String key = dataSource + "." + item; if (!pro.containsKey(key)) { throw new IOException("缺少配置項:" + item);//不包含,就是缺項了 } map.put(item,c.getConstructor(String.class).newInstance(pro.getProperty(key))); } return map; } catch (Exception e) { System.err.println(dataSource+"資料來源配置資訊異常,系統強制退出:" + e.getMessage()); System.exit(-1); } finally { if (null != pro) { pro.clear(); pro = null; } } return null; } protected static void close(AutoCloseable...acs){ for (AutoCloseable ac : acs) { if (null != ac) { try { ac.close(); } catch (Exception e) { e.printStackTrace(); } } }} }
***重點類**** ConPool
package cn.kgc.kb08.jdbc.dao3.impl; import cn.kgc.kb08.jdbc.dao3.SelRtn; import cn.kgc.kb08.jdbc.dao3.Dao; import cn.kgc.kb08.jdbc.dao3.Pool; import java.lang.reflect.Method; import java.sql.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 彈性連線池:生產和管理物件的 */ public final class ConPool implements Pool { // pool.maxIdel=30是什麼 => 請看文件,官方會寫 // pool.retryInterval=50 // pool.maxRetryCount=8 /** * 池中連線 */ class PoolCon { boolean free = true; boolean core; Connection con; long idleBegin; public PoolCon(boolean core, Connection con) { this.core = core; this.con = con; restIdle(); } public void restIdle() { if (!core) { this.idleBegin = System.currentTimeMillis(); } } } private ConcurrentMap<Integer, PoolCon> pool; private Map<String, Integer> cnfPool; private Map<String, String> cnfCon; /** * 執行定期清理執行緒池 * 檢查核心連線物件的有效性,無效則建立新核心連線物件覆蓋 * 檢查臨時連線物件是否超時,超時則關閉並移除 */ private ScheduledExecutorService schedule; private ExecutorService service; private Lock lock; private Condition cond; private boolean clearing; public ConPool() { initCnf(); initPool(); startClear(); } // 塞進pool和mysql的配置:比如Map中driver:xxx的鍵值對 private void initCnf() { cnfPool = PoolUtil.parse(Integer.class, "pool", Arrays.asList(PoolConstant.POOL)); cnfCon = PoolUtil.parse(String.class, "mysql01", Arrays.asList(PoolConstant.MYSQL)); } // 初始化連線池 private void initPool() { final int MAX_COUNT = cnfPool.get(PoolConstant.POOL_MAX_COUNT); service = Executors.newFixedThreadPool(MAX_COUNT * 2); schedule = Executors.newSingleThreadScheduledExecutor(); lock = new ReentrantLock(true); cond = lock.newCondition(); //分段鎖的集合 pool = new ConcurrentHashMap<>(MAX_COUNT); // 池中連線 PoolCon pc; final int CORE_COUNT = cnfPool.get(PoolConstant.POOL_CORE_COUNT); for (Integer i = 0, j = 1; i <= CORE_COUNT; i++) { pc = makePoolCon(true); if (null != pc) { // 給核心連線一個編號 pool.put(j++, pc); } } if (pool.size() == 0) { System.err.println("連線池初始化失敗,系統強制退出"); System.exit(-1); } // 如果配置讓你失敗便退出,且核心池數量小於一半 if (cnfPool.get(PoolConstant.POOL_EXIT_ON_ERR) == 1 && pool.size() <= CORE_COUNT / 2) { System.err.println("連線池初始化過半異常,系統強制退出"); System.exit(-1); } } /** * 建立一個池中的連線物件 * * @param core 池物件型別,true:核心物件,false:臨時物件 * @return */ private PoolCon makePoolCon(boolean core) { PoolCon pc = null; // 最大重試次數,建立n次,創建出一個連線物件 for (int i = 0; i <= cnfPool.get(PoolConstant.POOL_MAX_RETRY_COUNT); i++) { try { Connection con = DriverManager.getConnection( cnfCon.get(PoolConstant.MYSQL_URI), cnfCon.get(PoolConstant.MYSQL_USER), cnfCon.get(PoolConstant.MYSQL_PASS) ); pc = new PoolCon(core, con); } catch (SQLException e) { try { // 建立失敗就休息片刻再建立(重試) TimeUnit.SECONDS.sleep(cnfPool.get(PoolConstant.POOL_RETRY_INTERVAL)); continue; } catch (InterruptedException e1) { e1.printStackTrace(); System.out.println("cuocuocuo"); } e.printStackTrace(); } } return pc; } /** * 驗證核心連線物件是否有效 * * @param pc * @return */ private boolean isPCValid(PoolCon pc) { try { pc.con.createStatement().executeQuery("select 1"); return true; } catch (SQLException e) { return false; } } /** * 驗證臨時連線物件是否過期 * * @param pc 池連線物件 * @return true:過期,false:沒過期 */ private boolean isExpired(PoolCon pc) { return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - pc.idleBegin) >= cnfPool.get(PoolConstant.POOL_MAX_IDELE); } /** * 驗證使用者是否超出配置最大時限 * @param waitBegin 計算參考起點時間 * @return */ private boolean isWaitExpired(long waitBegin){ return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()-waitBegin)>= cnfPool.get(PoolConstant.POOL_MAX_WAIT); } /** * 開啟定期清理任務 * maxIdle,最長閒置時間 */ private void startClear() { int delay = cnfPool.get(PoolConstant.POOL_MAX_IDELE); schedule.scheduleWithFixedDelay(new Runnable() { @Override public void run() { lock.lock(); clearing = true; for (Integer key : pool.keySet()) { PoolCon pc = pool.get(key); if (!pc.free) { continue; } if (pc.core) { if (!isPCValid(pc)) { pool.put(key, makePoolCon(true)); } } else { if (isExpired(pc) || !isPCValid(pc)) { pool.remove(key); } } } clearing = false; cond.signalAll(); lock.unlock(); } }, delay, delay, TimeUnit.SECONDS); } /** * 連線池銷燬 */ @Override public void destory() { while (pool.size() > 0) { for (Integer key : pool.keySet()) { PoolCon pc = pool.get(key); if (pc.free) { pc.free = false; PoolUtil.close(pc.con); pool.remove(key); } } try { TimeUnit.MILLISECONDS.sleep(cnfPool.get(PoolConstant.POOL_RETRY_INTERVAL)); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * * @return */ private PoolCon fetch() { long waitBegin = System.currentTimeMillis(); for (Integer i = 0; i <= cnfPool.get(PoolConstant.POOL_MAX_RETRY_COUNT); i++) { try { lock.lock(); if (clearing) { cond.await(); } for (Integer key : pool.keySet()) { PoolCon pc = pool.get(key); if (pc.free && isPCValid(pc)) { pc.free = false; return pc; } } if(isWaitExpired(waitBegin)){ return null; } TimeUnit.MILLISECONDS.sleep(cnfPool.get(PoolConstant.POOL_RETRY_INTERVAL)); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } if(pool.size()< cnfPool.get(PoolConstant.POOL_MAX_COUNT)){ PoolCon pc = makePoolCon(false); if (null != pc){ pc.free = false; pool.put(pool.size()+1,pc); return pc; } } return null; } private void giveback(PoolCon pc){ if(null==pc){ return; } if(!pc.core){ pc.restIdle(); } pc.free = true; } @Override public Dao newDao(){ return new Dao() { private PreparedStatement getPst(Connection con, final String SQL, Object... params) throws SQLException { PreparedStatement pst = con.prepareStatement(SQL); if (null != params && params.length > 0) { for (int i = 0; i < params.length; i++) { pst.setObject(i + 1, params[i]); } } return pst; } private int update(PreparedStatement pst) throws SQLException { return pst.executeUpdate(); } private ResultSet query(PreparedStatement pst) throws SQLException { return pst.executeQuery(); } private Map<String, Method> parseMethod(Class c) { Map<String, Method> mapMethod = new HashMap<>(); final String PREFIX = "set"; for (Method method : c.getDeclaredMethods()) { String name = method.getName(); if (!name.startsWith(PREFIX)) { continue; } name = name.substring(3); name = name.substring(0, 1).toLowerCase() + name.substring(1); mapMethod.put(name, method); } return mapMethod; } private String[] parseStruct(ResultSetMetaData md) throws SQLException { String[] names = new String[md.getColumnCount()]; for (int i = 0; i < names.length; i++) { names[i] = md.getColumnLabel(i + 1); } return names; } @Override public int exeUpd(final String SQL, final Object... params) { try { return service.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { int rst = 0; PoolCon pc = null; // Connection con = null; PreparedStatement pst = null; try { pc = fetch(); if (null != pc) { pst = getPst(pc.con, SQL, params); rst = update(pst); } } catch (SQLException e) { rst = -1; } finally { PoolUtil.close(pst); giveback(pc); } return rst; } }).get(); } catch (Exception e) { return -1; } } @Override public <T> SelRtn exeSingle(final Class<T> c, final String SQL, final Object... params) { try { return service.submit(new Callable<SelRtn>() { @Override public SelRtn call() throws Exception { PoolCon pc = null; PreparedStatement pst = null; ResultSet rst = null; try { pc = fetch(); pst = getPst(pc.con, SQL, params); rst = query(pst); if (null != rst && rst.next()) { // 呼叫型別(非Character基本型別包裝類)c的,帶有唯一字串引數的構造方法 // c.getConstructor(String.class)//基本型別建立物件 return SelRtn.succeed( c.getConstructor(String.class).newInstance(rst.getObject(1).toString())); } else { return SelRtn.succeed(null); } } catch (Exception e) { e.printStackTrace(); } finally { // close(rst, pst, con); PoolUtil.close(rst, pst); giveback(pc); } return SelRtn.fail(); } }).get(); } catch (Exception e) { return SelRtn.fail(); } } @Override public <T> SelRtn exeQuery(final Class<T> c, final String SQL, final Object... params) { try { return service.submit(new Callable<SelRtn>() { @Override public SelRtn call() throws Exception { PoolCon pc = null; PreparedStatement pst = null; ResultSet rst = null; try { pst = getPst(pc.con, SQL, params); rst = query(pst); if (null != rst && rst.next()) { List<T> list = new ArrayList<>(); Map<String, Method> map = parseMethod(c); String[] names = parseStruct(rst.getMetaData()); do { T t = c.newInstance(); for (String name : names) { map.get(name).invoke(t, rst.getObject(name)); } list.add(t); } while (rst.next()); return SelRtn.succeed(list); } else { return SelRtn.succeed(null); } } catch (Exception e) { e.printStackTrace(); } finally { PoolUtil.close(rst, pst); giveback(pc); } return SelRtn.fail(); } }).get(); } catch (Exception e) { return SelRtn.fail(); } } }; } }
Pool
public interface Pool { void destory(); Dao newDao(); }
PoolFactory
package cn.kgc.kb08.jdbc.dao3; import cn.kgc.kb08.jdbc.dao3.impl.ConPool; public abstract class PoolFactory { private static Dao dao; private static synchronized void init(){ if(null==dao){ dao = new ConPool().newDao(); } } public static Dao get(){ if(null==dao){ init(); } return dao; } }
SelRtn
package cn.kgc.kb08.jdbc.dao3; /** * 完善查詢操作返回型別,對於異常的缺失 */ public final class SelRtn { private boolean err = false; private Object rtn; public static SelRtn succeed(Object rtn){ return new SelRtn(rtn); } public static SelRtn fail(){ return new SelRtn(); } private SelRtn(Object rtn) { this.rtn = rtn; } private SelRtn() { this.err = true; } public boolean isErr(){ return this.err; } public <T> T getRtn(){ return (T) rtn; } }