1. 程式人生 > 實用技巧 >Java 之 JDBC執行緒池(原始碼版)

Java 之 JDBC執行緒池(原始碼版)

一、目錄

二、程式碼

PoolConstant

package cn.kgc.kb08.jdbc.dao3.impl;

public interface PoolConstant {
    String POOL_CORE_COUNT="coreCount";
    String POOL_MAX_COUNT="maxCount";
    String POOL_MAX_IDELE="maxIdel";
    String POOL_MAX_WAIT="maxWait";
    String POOL_RETRY_INTERVAL="retryInterval";
    String POOL_MAX_RETRY_COUNT
="maxRetryCount"; String POOL_EXIT_ON_ERR="exitOnErr"; String[] POOL={ POOL_CORE_COUNT, POOL_MAX_COUNT, POOL_MAX_IDELE, POOL_MAX_WAIT, POOL_RETRY_INTERVAL, POOL_MAX_RETRY_COUNT, POOL_EXIT_ON_ERR }; String MYSQL_DRI
="driver"; String MYSQL_URI="url"; String MYSQL_USER="username"; String MYSQL_PASS="password"; String[] MYSQL = { MYSQL_DRI, MYSQL_URI, MYSQL_USER, MYSQL_PASS }; }

PoolUtil

package cn.kgc.kb08.jdbc.dao3.impl;

import cn.kgc.kb08.jdbc.dao2.Dao;
import cn.kgc.kb08.jdbc.dao2.impl.BaseDao; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; public class PoolUtil { private static Dao dao; /** * 解析資料來源配置資訊 * @param dataSource 資料來源名稱 * @return Map<String,String> */ protected static <T>Map<String,T> parse(Class<T> c,String dataSource, List<String> items){ // File config = new File("config/sys.properties"); // Properties pro = new Properties(); // try { // pro.load(new FileInputStream(config)); // Map<String,T> map = new HashMap<>(items.size()); // for (String item : items) { // String key = dataSource+"."+ item; // if (!pro.containsKey(key)){ // throw new IOException("缺少配置專案"+item); // } // map.put(item,c.getConstructor(String.class).newInstance(pro.getProperty(key))); // } // } catch (Exception e) { // e.printStackTrace(); // System.out.println("資源配置缺失,系統強制退出"+e.getMessage()); // System.exit(-1); // }finally { // if(null!=pro){ // pro.clear(); // pro = null; // // } // } // // return null; File config = new File("config/sys.properties"); Properties pro = new Properties();//Properties是一個檔案 try { pro.load(new FileInputStream(config)); //final String[] items = {"driver", "url", "username", "password"}; Map<String,T> map = new HashMap<>(items.size()); for (String item : items) { String key = dataSource + "." + item; if (!pro.containsKey(key)) { throw new IOException("缺少配置項:" + item);//不包含,就是缺項了 } map.put(item,c.getConstructor(String.class).newInstance(pro.getProperty(key))); } return map; } catch (Exception e) { System.err.println(dataSource+"資料來源配置資訊異常,系統強制退出:" + e.getMessage()); System.exit(-1); } finally { if (null != pro) { pro.clear(); pro = null; } } return null; } protected static void close(AutoCloseable...acs){ for (AutoCloseable ac : acs) { if (null != ac) { try { ac.close(); } catch (Exception e) { e.printStackTrace(); } } }} }

***重點類**** ConPool

package cn.kgc.kb08.jdbc.dao3.impl;


import cn.kgc.kb08.jdbc.dao3.SelRtn;
import cn.kgc.kb08.jdbc.dao3.Dao;
import cn.kgc.kb08.jdbc.dao3.Pool;

import java.lang.reflect.Method;
import java.sql.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 彈性連線池:生產和管理物件的
 */
public final  class ConPool implements Pool {

//    pool.maxIdel=30是什麼 => 請看文件,官方會寫
//    pool.retryInterval=50
//    pool.maxRetryCount=8

    /**
     * 池中連線
     */
    class PoolCon {
        boolean free = true;
        boolean core;
        Connection con;
        long idleBegin;

        public PoolCon(boolean core, Connection con) {
            this.core = core;
            this.con = con;
            restIdle();
        }

        public void restIdle() {
            if (!core) {
                this.idleBegin = System.currentTimeMillis();
            }
        }


    }

    private ConcurrentMap<Integer, PoolCon> pool;
    private Map<String, Integer> cnfPool;
    private Map<String, String> cnfCon;
    /**
     * 執行定期清理執行緒池
     * 檢查核心連線物件的有效性,無效則建立新核心連線物件覆蓋
     * 檢查臨時連線物件是否超時,超時則關閉並移除
     */

    private ScheduledExecutorService schedule;
    private ExecutorService service;
    private Lock lock;
    private Condition cond;
    private boolean clearing;

    public ConPool() {
        initCnf();
        initPool();
        startClear();
    }

// 塞進pool和mysql的配置:比如Map中driver:xxx的鍵值對
    private void initCnf() {
        cnfPool = PoolUtil.parse(Integer.class, "pool",
                Arrays.asList(PoolConstant.POOL));
        cnfCon = PoolUtil.parse(String.class, "mysql01",
                Arrays.asList(PoolConstant.MYSQL));
    }


    //    初始化連線池
    private void initPool() {
        final int MAX_COUNT = cnfPool.get(PoolConstant.POOL_MAX_COUNT);
        service = Executors.newFixedThreadPool(MAX_COUNT * 2);
        schedule = Executors.newSingleThreadScheduledExecutor();
        lock = new ReentrantLock(true);
        cond = lock.newCondition();
        //分段鎖的集合
        pool = new ConcurrentHashMap<>(MAX_COUNT);
        // 池中連線
        PoolCon pc;
        final int CORE_COUNT = cnfPool.get(PoolConstant.POOL_CORE_COUNT);
        for (Integer i = 0, j = 1; i <= CORE_COUNT; i++) {
            pc = makePoolCon(true);
            if (null != pc) {
                // 給核心連線一個編號
                pool.put(j++, pc);
            }
        }
        if (pool.size() == 0) {
            System.err.println("連線池初始化失敗,系統強制退出");
            System.exit(-1);
        }
        // 如果配置讓你失敗便退出,且核心池數量小於一半
        if (cnfPool.get(PoolConstant.POOL_EXIT_ON_ERR) == 1
                && pool.size() <= CORE_COUNT / 2) {
            System.err.println("連線池初始化過半異常,系統強制退出");
            System.exit(-1);

        }
    }


    /**
     * 建立一個池中的連線物件
     *
     * @param core 池物件型別,true:核心物件,false:臨時物件
     * @return
     */

    private PoolCon makePoolCon(boolean core) {
        PoolCon pc = null;
        // 最大重試次數,建立n次,創建出一個連線物件
        for (int i = 0; i <= cnfPool.get(PoolConstant.POOL_MAX_RETRY_COUNT); i++) {
            try {
                Connection con = DriverManager.getConnection(
                        cnfCon.get(PoolConstant.MYSQL_URI),
                        cnfCon.get(PoolConstant.MYSQL_USER),
                        cnfCon.get(PoolConstant.MYSQL_PASS)
                );
                pc = new PoolCon(core, con);
            } catch (SQLException e) {
                try {
                    // 建立失敗就休息片刻再建立(重試)
                    TimeUnit.SECONDS.sleep(cnfPool.get(PoolConstant.POOL_RETRY_INTERVAL));
                    continue;
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                    System.out.println("cuocuocuo");
                }
                e.printStackTrace();
            }
        }
        return pc;

    }

    /**
     * 驗證核心連線物件是否有效
     *
     * @param pc
     * @return
     */
    private boolean isPCValid(PoolCon pc) {
        try {
            pc.con.createStatement().executeQuery("select 1");
            return true;
        } catch (SQLException e) {
            return false;
        }
    }

    /**
     * 驗證臨時連線物件是否過期
     *
     * @param pc 池連線物件
     * @return true:過期,false:沒過期
     */
    private boolean isExpired(PoolCon pc) {
        return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - pc.idleBegin) >=
                cnfPool.get(PoolConstant.POOL_MAX_IDELE);
    }


    /**
     * 驗證使用者是否超出配置最大時限
     * @param waitBegin   計算參考起點時間
     * @return
     */
    private boolean isWaitExpired(long waitBegin){
        return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()-waitBegin)>=
                cnfPool.get(PoolConstant.POOL_MAX_WAIT);
    }

    /**
     * 開啟定期清理任務
     * maxIdle,最長閒置時間
     */
    private void startClear() {
        int delay = cnfPool.get(PoolConstant.POOL_MAX_IDELE);
        schedule.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                lock.lock();
                clearing = true;
                for (Integer key : pool.keySet()) {
                    PoolCon pc = pool.get(key);
                    if (!pc.free) {
                        continue;
                    }
                    if (pc.core) {
                        if (!isPCValid(pc)) {
                            pool.put(key, makePoolCon(true));
                        }
                    } else {
                        if (isExpired(pc) || !isPCValid(pc)) {
                            pool.remove(key);
                        }
                    }
                }


                clearing = false;
                cond.signalAll();
                lock.unlock();
            }
        }, delay, delay, TimeUnit.SECONDS);
    }


    /**
     * 連線池銷燬
     */
    @Override
    public void destory() {
        while (pool.size() > 0) {
            for (Integer key : pool.keySet()) {
                PoolCon pc = pool.get(key);
                if (pc.free) {
                    pc.free = false;
                    PoolUtil.close(pc.con);
                    pool.remove(key);
                }
            }
            try {
                TimeUnit.MILLISECONDS.sleep(cnfPool.get(PoolConstant.POOL_RETRY_INTERVAL));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }


    }


    /**
     *
     * @return
     */
    private PoolCon fetch() {
        long waitBegin = System.currentTimeMillis();

            for (Integer i = 0; i <= cnfPool.get(PoolConstant.POOL_MAX_RETRY_COUNT); i++) {
                try {
                    lock.lock();
                    if (clearing) {
                        cond.await();
                    }
                    for (Integer key : pool.keySet()) {
                        PoolCon pc = pool.get(key);
                        if (pc.free && isPCValid(pc)) {
                            pc.free = false;
                            return pc;
                        }
                    }
                    if(isWaitExpired(waitBegin)){
                        return null;
                    }
                    TimeUnit.MILLISECONDS.sleep(cnfPool.get(PoolConstant.POOL_RETRY_INTERVAL));
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
            if(pool.size()< cnfPool.get(PoolConstant.POOL_MAX_COUNT)){
                   PoolCon pc = makePoolCon(false);
                   if (null != pc){
                       pc.free = false;
                      pool.put(pool.size()+1,pc);
                      return pc;
                   }
            }
        return null;
    }




    private  void  giveback(PoolCon pc){
        if(null==pc){
            return;
        }
        if(!pc.core){
            pc.restIdle();
        }
        pc.free = true;
    }



@Override
    public Dao newDao(){
        return new Dao() {
            private PreparedStatement getPst(Connection con, final String SQL, Object... params) throws SQLException {
                PreparedStatement pst = con.prepareStatement(SQL);
                if (null != params && params.length > 0) {
                    for (int i = 0; i < params.length; i++) {
                        pst.setObject(i + 1, params[i]);
                    }
                }
                return pst;
            }

            private int update(PreparedStatement pst) throws SQLException {
                return pst.executeUpdate();
            }

            private ResultSet query(PreparedStatement pst) throws SQLException {
                return pst.executeQuery();
            }

            private Map<String, Method> parseMethod(Class c) {
                Map<String, Method> mapMethod = new HashMap<>();
                final String PREFIX = "set";
                for (Method method : c.getDeclaredMethods()) {
                    String name = method.getName();
                    if (!name.startsWith(PREFIX)) {
                        continue;
                    }
                    name = name.substring(3);
                    name = name.substring(0, 1).toLowerCase() + name.substring(1);
                    mapMethod.put(name, method);
                }
                return mapMethod;
            }

            private String[] parseStruct(ResultSetMetaData md) throws SQLException {
                String[] names = new String[md.getColumnCount()];
                for (int i = 0; i < names.length; i++) {
                    names[i] = md.getColumnLabel(i + 1);
                }
                return names;
            }

            @Override
            public int exeUpd(final String SQL, final Object... params) {
                try {
                    return service.submit(new Callable<Integer>() {
                        @Override
                        public Integer call() throws Exception {
                            int rst = 0;
                            PoolCon pc = null;
                            //                        Connection con = null;
                            PreparedStatement pst = null;
                            try {
                                pc = fetch();
                                if (null != pc) {
                                    pst = getPst(pc.con, SQL, params);
                                    rst = update(pst);
                                }

                            } catch (SQLException e) {
                                rst = -1;
                            } finally {
                                PoolUtil.close(pst);
                                giveback(pc);
                            }
                            return rst;
                        }
                    }).get();
                } catch (Exception e) {
                    return -1;
                }

            }

            @Override
            public <T> SelRtn exeSingle(final Class<T> c, final String SQL, final Object... params) {
                try {
                    return service.submit(new Callable<SelRtn>() {
                        @Override
                        public SelRtn call() throws Exception {
                            PoolCon pc = null;
                            PreparedStatement pst = null;
                            ResultSet rst = null;
                            try {
                                pc = fetch();
                                pst = getPst(pc.con, SQL, params);
                                rst = query(pst);
                                if (null != rst && rst.next()) {
                                    //                呼叫型別(非Character基本型別包裝類)c的,帶有唯一字串引數的構造方法
                                    //                c.getConstructor(String.class)//基本型別建立物件
                                    return SelRtn.succeed(
                                            c.getConstructor(String.class).newInstance(rst.getObject(1).toString()));
                                } else {
                                    return SelRtn.succeed(null);
                                }
                            } catch (Exception e) {
                                e.printStackTrace();

                            } finally {
                                //                           close(rst, pst, con);
                                PoolUtil.close(rst, pst);
                                giveback(pc);
                            }
                            return SelRtn.fail();
                        }
                    }).get();
                } catch (Exception e) {
                    return SelRtn.fail();
                }
            }

            @Override
            public <T> SelRtn exeQuery(final Class<T> c, final String SQL, final Object... params) {
                try {
                    return service.submit(new Callable<SelRtn>() {
                        @Override
                        public SelRtn call() throws Exception {
                            PoolCon pc = null;
                            PreparedStatement pst = null;
                            ResultSet rst = null;
                            try {
                                pst = getPst(pc.con, SQL, params);
                                rst = query(pst);
                                if (null != rst && rst.next()) {
                                    List<T> list = new ArrayList<>();
                                    Map<String, Method> map = parseMethod(c);
                                    String[] names = parseStruct(rst.getMetaData());
                                    do {
                                        T t = c.newInstance();
                                        for (String name : names) {
                                            map.get(name).invoke(t, rst.getObject(name));
                                        }
                                        list.add(t);
                                    } while (rst.next());
                                    return SelRtn.succeed(list);
                                } else {
                                    return SelRtn.succeed(null);
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            } finally {
                                PoolUtil.close(rst, pst);
                                giveback(pc);
                            }
                            return SelRtn.fail();
                        }
                    }).get();
                } catch (Exception e) {
                    return SelRtn.fail();
                }
            }
        };


    }
}

Pool

public interface Pool {
    void destory();
    Dao newDao();
}

PoolFactory

package cn.kgc.kb08.jdbc.dao3;

import cn.kgc.kb08.jdbc.dao3.impl.ConPool;

public abstract class PoolFactory {
    private static Dao dao;
    private static synchronized  void init(){
        if(null==dao){
            dao = new ConPool().newDao();
        }
    }

    public static Dao get(){
        if(null==dao){
            init();
        }
        return dao;
    }
}

SelRtn

package cn.kgc.kb08.jdbc.dao3;

/**
 * 完善查詢操作返回型別,對於異常的缺失
 */
public final  class SelRtn {
    private boolean err = false;
    private Object rtn;

    public static SelRtn succeed(Object rtn){
        return new SelRtn(rtn);
    }
    public static SelRtn fail(){
        return new SelRtn();
    }



    private SelRtn(Object rtn) {
        this.rtn = rtn;
    }

    private SelRtn() {
        this.err = true;
    }

    public boolean isErr(){
        return this.err;
    }

    public <T> T getRtn(){
        return (T) rtn;
    }

}