1. 程式人生 > 實用技巧 >Mybatis資料來源結構解析之連線池

Mybatis資料來源結構解析之連線池

對於 ORM 框架而言,資料來源的組織是一個非常重要的一部分,這直接影響到框架的效能問題。本文將通過對 MyBatis 框架的資料來源結構進行詳盡的分析,找出什麼時候建立 Connection ,並且深入解析 MyBatis 的連線池。


本章的組織結構:

  • 零、什麼是連線池和執行緒池
  • 一、MyBatis 資料來源 DataSource 分類
  • 二、資料來源 DataSource 的建立過程
  • 三、 DataSource 什麼時候建立 Connection 物件
  • 四、不使用連線池的 UnpooledDataSource
  • 五、使用了連線池的 PooledDataSource

連線池和執行緒池

連線池:(降低物理連線損耗)

  • 1、連線池是面向資料庫連線的
  • 2、連線池是為了優化資料庫連線資源
  • 3、連線池有點類似在客戶端做優化

資料庫連線是一項有限的昂貴資源,一個數據庫連線物件均對應一個物理資料庫連線,每次操作都開啟一個物理連線,使用完都關閉連線,這樣造成系統的效能低下。 資料庫連線池的解決方案是在應用程式啟動時建立足夠的資料庫連線,並將這些連線組成一個連線池,由應用程式動態地對池中的連線進行申請、使用和釋放。對於多於連線池中連線數的併發請求,應該在請求佇列中排隊等待。並且應用程式可以根據池中連線的使用率,動態增加或減少池中的連線數。


執行緒池:(降低執行緒建立銷燬損耗)

  • 1、執行緒池是面向後臺程式的
  • 2、執行緒池是是為了提高記憶體和CPU效率
  • 3、執行緒池有點類似於在服務端做優化

執行緒池是一次性建立一定數量的執行緒(應該可以配置初始執行緒數量的),當用請求過來不用去建立新的執行緒,直接使用已建立的執行緒,使用後又放回到執行緒池中。
避免了頻繁建立執行緒,及銷燬執行緒的系統開銷,提高是記憶體和CPU效率。

相同點:

都是事先準備好資源,避免頻繁建立和銷燬的代價。

資料來源的分類

在Mybatis體系中,分為3DataSource

開啟Mybatis原始碼找到datasource包,可以看到3個子package

  • UNPOOLED 不使用連線池的資料來源

  • POOLED 使用連線池的資料來源

  • JNDI 使用JNDI

    實現的資料來源

MyBatis內部分別定義了實現了java.sql.DataSource介面的UnpooledDataSource,PooledDataSource類來表示UNPOOLED、POOLED型別的資料來源。 如下圖所示:

  • PooledDataSource和UnpooledDataSrouce都實現了java.sql.DataSource介面.
  • PooledDataSource持有一個UnPooledDataSource的引用,當PooledDataSource要建立Connection例項時,實際還是通過UnPooledDataSource來建立的.(PooledDataSource)只是提供一種快取連線池機制.

JNDI型別的資料來源DataSource,則是通過JNDI上下文中取值。

資料來源 DataSource 的建立過程

在mybatis的XML配置檔案中,使用元素來配置資料來源:

<!-- 配置資料來源(連線池) -->
<dataSource type="POOLED"> //這裡 type 屬性的取值就是為POOLED、UNPOOLED、JNDI
  <property name="driver" value="${jdbc.driver}"/>
  <property name="url" value="${jdbc.url}"/>
  <property name="username" value="${jdbc.username}"/>
  <property name="password" value="${jdbc.password}"/>
</dataSource>

MyBatis在初始化時,解析此檔案,根據<dataSource>的type屬性來建立相應型別的的資料來源DataSource,即:

  • type=”POOLED” :建立PooledDataSource例項

  • type=”UNPOOLED” :建立UnpooledDataSource例項

  • type=”JNDI” :從JNDI服務上查詢DataSource例項


    Mybatis是通過工廠模式來建立資料來源物件的 我們來看看原始碼:

    public interface DataSourceFactory {
    
    void setProperties(Properties props);
    
    DataSource getDataSource();//生產DataSource
    
    }
    

    上述3種類型的資料來源,對應有自己的工廠模式,都實現了這個DataSourceFactory

MyBatis建立了DataSource例項後,會將其放到Configuration物件內的Environment物件中, 供以後使用。

注意dataSource 此時只會儲存好配置資訊.連線池此時並沒有建立好連線.只有當程式在呼叫操作資料庫的方法時,才會初始化連線.

DataSource什麼時候建立Connection物件

我們需要建立SqlSession物件並需要執行SQL語句時,這時候MyBatis才會去呼叫dataSource物件來建立java.sql.Connection物件。也就是說,java.sql.Connection物件的建立一直延遲到執行SQL語句的時候。

例子:

  @Test
  public void testMyBatisBuild() throws IOException {
      Reader reader = Resources.getResourceAsReader("mybatis-config.xml");
      SqlSessionFactory factory = new SqlSessionFactoryBuilder().build(reader);
      SqlSession sqlSession = factory.openSession();
      TestMapper mapper = sqlSession.getMapper(TestMapper.class);
      Ttest one = mapper.getOne(1L);//直到這一行,才會去建立一個數據庫連線!
      System.out.println(one);
      sqlSession.close();
  }

口說無憑,跟進原始碼看看他們是在什麼時候建立的...

跟進原始碼,驗證Datasource 和Connection物件建立時機

驗證Datasource建立時機
  • 上面我們已經知道,pooled資料來源實際上也是使用的unpooled的例項,那麼我們在UnpooledDataSourceFactory的
    getDataSource方法的原始碼中做一些修改 並執行測試用例:
@Override
public DataSource getDataSource() {//此方法是UnpooledDataSourceFactory實現DataSourceFactory複寫
  System.out.println("建立了資料來源");
  System.out.println(dataSource.toString());
  return dataSource;
}

結論:在建立完SqlsessionFactory時,DataSource例項就建立了.


驗證Connection建立時機

首先我們先查出現在資料庫的所有連線數,在資料庫中執行

SELECT * FROM performance_schema.hosts;

返回資料: 顯示當前連線數為1,總連線數70

show full processlist; //顯示所有的任務列表

返回:當前只有一個查詢的連線在執行

重新啟動專案,在執行到需要執行實際的sql操作時,可以看到他已經被代理增強了

直到此時,連線數還是沒有變,說明連線還沒有建立,我們接著往下看.

我們按F7進入方法,可以看到,他被代理,,這時候會執行到之前的代理方法中呼叫invoke方法.這裡有一個判斷,但是並不成立,於是進入cachedInvoker(method).invoke()方法代理執行一下操作

cachedInvoker(method).invoke()方法

  
    @Override
    public Object invoke(Object proxy, Method method, Object[] args, SqlSession sqlSession) throws Throwable {
      return mapperMethod.execute(sqlSession, args);
    }

繼續F7進入方法,由於我們是單條查詢select 所以會case進入select塊中的selectOne

繼續F7

繼續F7

通過configuration.getMappedStatement獲取MappedStatement

單步步過,F8後,進入executor.query方法

@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
  BoundSql boundSql = ms.getBoundSql(parameterObject);
  CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
  return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}

繼續走到底,F7進入query方法

此時,會去快取中查詢,這裡的快取是二級快取物件 ,生命週期是mapper級別的(一級快取是一個session級別的),因為我們此時是第一次執行程式,所以肯定為Null,這時候會直接去查詢,呼叫delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql)方法,F7進入這個方法

二級快取沒有獲取到,又去查詢了一級快取,發現一級快取也沒有,這個時候,才去查資料庫

queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);//沒有快取則去db查

F7進入queryFromDatabase方法.看到是一些對一級快取的操作,我們主要看doQuery方法F7進入它.

可以看到它準備了一個空的Statement

我們F7跟進看一下prepareStatement方法 ,發現他呼叫了getConnection,哎!有點眼熟了,繼續F7進入getConnection()方法,

又是一個getConnection()....繼續F7進入transaction.getConnection()方法

又是一個getConnection()方法.判斷connection是否為空.為空openConnection()否則直接返回connection;我們F7繼續跟進openConnection()方法

  protected void openConnection() throws SQLException {
  if (log.isDebugEnabled()) {
    log.debug("Opening JDBC Connection");
  }
  connection = dataSource.getConnection();//最終獲取連線的地方在這句.
  if (level != null) {
    connection.setTransactionIsolation(level.getLevel());//設定隔離等級
  }
  setDesiredAutoCommit(autoCommit);//是否自動提交,預設false,update不會提交到資料庫,需要手動commit
}

dataSource.getConnection()執行完,至此一個connection才建立完成.
我們驗證一下 在dataSource.getConnection()時打一下斷點.

此時資料庫中的連線數依然沒變 還是1

我們按F8 執行一步

在控制檯可以看到connection = com.mysql.jdbc.JDBC4Connection@1500b2f3 例項建立完畢 我們再去資料庫中看看連線數

兩個連線分別是:

不使用連線池的 UnpooledDataSource

的type屬性被配置成了”UNPOOLED”,MyBatis首先會例項化一個UnpooledDataSourceFactory工廠例項,然後通過.getDataSource()方法返回一個UnpooledDataSource例項物件引用,我們假定為dataSource。
使用UnpooledDataSource的getConnection(),每呼叫一次就會產生一個新的Connection例項物件。

UnPooledDataSource的getConnection()方法實現如下:

/*
UnpooledDataSource的getConnection()實現
*/
public Connection getConnection() throws SQLException
{
  return doGetConnection(username, password);
}

private Connection doGetConnection(String username, String password) throws SQLException
{
  //封裝username和password成properties
  Properties props = new Properties();
  if (driverProperties != null)
  {
      props.putAll(driverProperties);
  }
  if (username != null)
  {
      props.setProperty("user", username);
  }
  if (password != null)
  {
      props.setProperty("password", password);
  }
  return doGetConnection(props);
}

/*
*  獲取資料連線
*/
private Connection doGetConnection(Properties properties) throws SQLException
{
  //1.初始化驅動
  initializeDriver();
  //2.從DriverManager中獲取連線,獲取新的Connection物件
  Connection connection = DriverManager.getConnection(url, properties);
  //3.配置connection屬性
  configureConnection(connection);
  return connection;
}

UnpooledDataSource會做以下幾件事情:

    1. 初始化驅動: 判斷driver驅動是否已經載入到記憶體中,如果還沒有載入,則會動態地載入driver類,並例項化一個Driver物件,使用DriverManager.registerDriver()方法將其註冊到記憶體中,以供後續使用。
    1. 建立Connection物件: 使用DriverManager.getConnection()方法建立連線。
    1. 配置Connection物件: 設定是否自動提交autoCommit和隔離級別isolationLevel。
    1. 返回Connection物件。

我們每呼叫一次getConnection()方法,都會通過DriverManager.getConnection()返回新的java.sql.Connection例項,這樣當然對於資源是一種浪費,為了防止重複的去建立和銷燬連線,於是引入了連線池的概念.

使用了連線池的 PooledDataSource

要理解連線池,首先要了解它對於connection的容器,它使用PoolState容器來管理所有的conncetion


在PoolState中,它將connection分為兩種狀態,空閒狀態(idle)活動狀態(active),他們分別被儲存到PoolState容器內的idleConnectionsactiveConnections兩個ArrayList中

  • idleConnections:空閒(idle)狀態PooledConnection物件被放置到此集合中,表示當前閒置的沒有被使用的PooledConnection集合,呼叫PooledDataSource的getConnection()方法時,會優先從此集合中取PooledConnection物件。當用完一個java.sql.Connection物件時,MyBatis會將其包裹成PooledConnection物件放到此集合中。

  • activeConnections:活動(active)狀態的PooledConnection物件被放置到名為activeConnections的ArrayList中,表示當前正在被使用的PooledConnection集合,呼叫PooledDataSource的getConnection()方法時,會優先從idleConnections集合中取PooledConnection物件,如果沒有,則看此集合是否已滿,如果未滿,PooledDataSource會創建出一個PooledConnection,新增到此集合中,並返回。

從連線池中獲取一個連線物件的過程

下面讓我們看一下PooledDataSource 的popConnection方法獲取Connection物件的實現:

private PooledConnection popConnection(String username, String password) throws SQLException {
    boolean countedWait = false;
    PooledConnection conn = null;
    long t = System.currentTimeMillis();
    int localBadConnectionCount = 0;

    while (conn == null) {
      synchronized (state) {//給state物件加鎖
        if (!state.idleConnections.isEmpty()) {//如果空閒列表不空,就從空閒列表中拿connection
          // Pool has available connection
          conn = state.idleConnections.remove(0);//拿出空閒列表中的第一個,去驗證連線是否還有效
          if (log.isDebugEnabled()) {
            log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
          }
        } else {
          // 空閒連線池中沒有可用的連線,就來看看活躍連線列表中是否有..先判斷活動連線總數 是否小於 最大可用的活動連線數
          if (state.activeConnections.size() < poolMaximumActiveConnections) {
            // 如果連線數小於list.size 直接建立新的連線.
            conn = new PooledConnection(dataSource.getConnection(), this);
            if (log.isDebugEnabled()) {
              log.debug("Created connection " + conn.getRealHashCode() + ".");
            }
          } else {
            // 此時連線數也滿了,不能建立新的連線. 找到最老的那個,檢查它是否過期
            //計算它的校驗時間,如果校驗時間大於連線池規定的最大校驗時間,則認為它已經過期了
            // 利用這個PoolConnection內部的realConnection重新生成一個PooledConnection
            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
            if (longestCheckoutTime > poolMaximumCheckoutTime) {
              // 可以要求過期這個連線.
              state.claimedOverdueConnectionCount++;
              state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
              state.accumulatedCheckoutTime += longestCheckoutTime;
              state.activeConnections.remove(oldestActiveConnection);
              if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                try {
                  oldestActiveConnection.getRealConnection().rollback();
                } catch (SQLException e) {
                  /*
                     Just log a message for debug and continue to execute the following
                     statement like nothing happened.
                     Wrap the bad connection with a new PooledConnection, this will help
                     to not interrupt current executing thread and give current thread a
                     chance to join the next competition for another valid/good database
                     connection. At the end of this loop, bad {@link @conn} will be set as null.
                   */
                  log.debug("Bad connection. Could not roll back");
                }
              }
              conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
              conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
              conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
              oldestActiveConnection.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
              }
            } else {
              //如果不能釋放,則必須等待
              // Must wait
              try {
                if (!countedWait) {
                  state.hadToWaitCount++;
                  countedWait = true;
                }
                if (log.isDebugEnabled()) {
                  log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                }
                long wt = System.currentTimeMillis();
                state.wait(poolTimeToWait);
                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
              } catch (InterruptedException e) {
                break;
              }
            }
          }
        }
        if (conn != null) {
          // ping to server and check the connection is valid or not
          if (conn.isValid()) {//去驗證連線是否還有效.
            if (!conn.getRealConnection().getAutoCommit()) {
              conn.getRealConnection().rollback();
            }
            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
            conn.setCheckoutTimestamp(System.currentTimeMillis());
            conn.setLastUsedTimestamp(System.currentTimeMillis());
            state.activeConnections.add(conn);
            state.requestCount++;
            state.accumulatedRequestTime += System.currentTimeMillis() - t;
          } else {
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
            }
            state.badConnectionCount++;
            localBadConnectionCount++;
            conn = null;
            if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
              if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Could not get a good connection to the database.");
              }
              throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
            }
          }
        }
      }

    }

    if (conn == null) {
      if (log.isDebugEnabled()) {
        log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
      }
      throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
    }

    return conn;
  }

如上所示,對於PooledDataSource的getConnection()方法內,先是呼叫類PooledDataSource的popConnection()方法返回了一個PooledConnection物件,然後呼叫了PooledConnection的getProxyConnection()來返回Connection物件。

複用連線的過程

如果我們使用了連線池,我們在用完了Connection物件時,需要將它放在連線池中,該怎樣做呢? 如果讓我們來想的話,應該是通過代理Connection物件,在呼叫close時,並不真正關閉,而是丟到管理連線的容器中去. 要驗證這個想法 那麼 來看看Mybatis幫我們怎麼實現複用連線的.

class PooledConnection implements InvocationHandler {

  //......
  //所建立它的datasource引用
  private PooledDataSource dataSource;
  //真正的Connection物件
  private Connection realConnection;
  //代理自己的代理Connection
  private Connection proxyConnection;

  //......
}

PooledConenction實現了InvocationHandler介面,並且,proxyConnection物件也是根據這個它來生成的代理物件:

public PooledConnection(Connection connection, PooledDataSource dataSource) {
    this.hashCode = connection.hashCode();
    this.realConnection = connection;//真實連線
    this.dataSource = dataSource;
    this.createdTimestamp = System.currentTimeMillis();
    this.lastUsedTimestamp = System.currentTimeMillis();
    this.valid = true;
    this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
  }

實際上,我們呼叫PooledDataSource的getConnection()方法返回的就是這個proxyConnection物件。
當我們呼叫此proxyConnection物件上的任何方法時,都會呼叫PooledConnection物件內invoke()方法。
讓我們看一下PooledConnection類中的invoke()方法定義:

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    //當呼叫關閉的時候,回收此Connection到PooledDataSource中
    if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
      dataSource.pushConnection(this);
      return null;
    } else {
      try {
        if (!Object.class.equals(method.getDeclaringClass())) {
          checkConnection();
        }
        return method.invoke(realConnection, args);
      } catch (Throwable t) {
        throw ExceptionUtil.unwrapThrowable(t);
      }
    }
  }

結論:當我們使用了pooledDataSource.getConnection()返回的Connection物件的close()方法時,不會呼叫真正Connection的close()方法,而是將此Connection物件放到連線池中。呼叫dataSource.pushConnection(this)實現

protected void pushConnection(PooledConnection conn) throws SQLException {

    synchronized (state) {
      state.activeConnections.remove(conn);
      if (conn.isValid()) {
        if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
          state.idleConnections.add(newConn);
          newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
          newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
          conn.invalidate();
          if (log.isDebugEnabled()) {
            log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
          }
          state.notifyAll();
        } else {
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          conn.getRealConnection().close();
          if (log.isDebugEnabled()) {
            log.debug("Closed connection " + conn.getRealHashCode() + ".");
          }
          conn.invalidate();
        }
      } else {
        if (log.isDebugEnabled()) {
          log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
        }
        state.badConnectionCount++;
      }
    }
  }

關注公眾號:java寶典