1. 程式人生 > >Mybatis中使用流式查詢避免資料量過大導致OOM

Mybatis中使用流式查詢避免資料量過大導致OOM

一、前言

前面介紹了裸露JDBC 方式使用流式程式設計,下面介紹下MYbatis中兩種使用流式查詢方法

二、Mybaits中MyBatisCursorItemReader的使用

2.1 配置

  • MyBatisCursorItemReader的注入

<bean id="myMyBatisCursorItemReader" class="org.mybatis.spring.batch.MyBatisCursorItemReader">
    <property name="sqlSessionFactory" ref="sqlSessionFactory" />
    <property name="queryId"
value="com.taobao.accs.mass.petadata.dal.sqlmap.AccsDeviceInfoDAOMapper.selectByExampleForPetaData" /> </bean>

其中queryId為mapper檔案中介面名稱。

  • Mapper.xml設定



image.png

image.png

其中fetchSize=”-2147483648″,Integer.MIN_VALUE=-2147483648

2.2 使用

static void testCursor1() throws UnexpectedInputException, ParseException, Exception 
{ try { Map<String, Object> param = new HashMap<String, Object>(); AccsDeviceInfoDAOExample accsDeviceInfoDAOExample = new AccsDeviceInfoDAOExample(); accsDeviceInfoDAOExample.createCriteria().andAppKeyEqualTo("12345").andAppVersionEqualTo("5.7.2.4.5"
) .andPackageNameEqualTo("com.test.zlx"); param.put("oredCriteria", accsDeviceInfoDAOExample.getOredCriteria()); // 設定引數 myMyBatisCursorItemReader.setParameterValues(param); // 建立遊標 myMyBatisCursorItemReader.open(new ExecutionContext()); //使用遊標迭代獲取每個記錄 Long count = 0L; AccsDeviceInfoDAO accsDeviceInfoDAO; while ((accsDeviceInfoDAO = myMyBatisCursorItemReader.read()) != null) { System.out.println(JSON.toJSONString(accsDeviceInfoDAO)); ++count; System.out.println(count); } } catch (Exception e) { System.out.println("error:" + e.getLocalizedMessage()); } finally { // do some myMyBatisCursorItemReader.close(); } }

2.3 原理簡單介紹

  • open函式

作用從session工廠獲取一個session,然後呼叫session的selectCursor,它最終會呼叫

ConnectionImpl的prepareStatement方法:

public java.sql.PreparedStatement prepareStatement(String sql) throws SQLException {
    return prepareStatement(sql, DEFAULT_RESULT_SET_TYPE, DEFAULT_RESULT_SET_CONCURRENCY);
}
private static final int DEFAULT_RESULT_SET_TYPE = ResultSet.TYPE_FORWARD_ONLY;

private static final int DEFAULT_RESULT_SET_CONCURRENCY = ResultSet.CONCUR_READ_ONLY;

至此三個條件滿足了兩個,在加上我們自己設定的fetchSize就通知mysql要建立流式ResultSet。
那麼fectchsize何處設定那?

image.png

image.png

圖中1建立prepareStatement,2設定fetchSize.

設定後最後會呼叫MysqlIO的sqlQueryDirect方法執行具體sql並把結果resultset存放到JDBC4PrepardStatement中。

  • read函式

read函式作用是從結果集resultset中獲取資料,首先呼叫.next判斷是否有資料,有的話則讀取資料。
這和純粹JDBC程式設計方式就一樣了,只是read函式對其進行了包裝。

三、Mybatis中ResultHandler的使用

3.1 配置

  • Mapper.xml設定



image.png

image.png

其中fetchSize=”-2147483648″,Integer.MIN_VALUE=-2147483648

3.2 使用

static void testCursor2() {

    SqlSession session = sqlSessionFactory.openSession();
    Map<String, Object> param = new HashMap<String, Object>();
    
    AccsDeviceInfoDAOExample accsDeviceInfoDAOExample = new AccsDeviceInfoDAOExample();
    accsDeviceInfoDAOExample.createCriteria().andAppKeyEqualTo("12345").andAppVersionEqualTo("1.2.3.4")
            .andPackageNameEqualTo("com.hello.test");

    param.put("oredCriteria", accsDeviceInfoDAOExample.getOredCriteria());

    session.select("com.taobao.accs.mass.petadata.dal.sqlmap.AccsDeviceInfoDAOMapper.selectByExampleForPetaData",
            param, new ResultHandler() {

                @Override
                public void handleResult(ResultContext resultContext) {
                    AccsDeviceInfoDAO accsDeviceInfoDAO = (AccsDeviceInfoDAO) resultContext.getResultObject();

                    System.out.println(resultContext.getResultCount());
                    System.out.println(JSON.toJSONString(accsDeviceInfoDAO));

                }

            });

}

3.3 原理簡單介紹

類似第三節,只是第三節返回了操作ResultSet的遊標讓使用者自己迭代獲取資料,而現在是內部直接操作ResultSet逐條獲取資料並呼叫回撥handler的handleResult方法進行處理。

  private void handleRowValuesForSimpleResultMap(ResultSetWrapper rsw, ResultMap resultMap, ResultHandler<?> resultHandler, RowBounds rowBounds, ResultMapping parentMapping)
      throws SQLException {
    DefaultResultContext<Object> resultContext = new DefaultResultContext<Object>();
    skipRows(rsw.getResultSet(), rowBounds);
    while (shouldProcessMoreRows(resultContext, rowBounds) && rsw.getResultSet().next()) {
      ResultMap discriminatedResultMap = resolveDiscriminatedResultMap(rsw.getResultSet(), resultMap, null);
      Object rowValue = getRowValue(rsw, discriminatedResultMap);
      storeObject(resultHandler, resultContext, rowValue, parentMapping, rsw.getResultSet());
    }
  }

  private void storeObject(ResultHandler<?> resultHandler, DefaultResultContext<Object> resultContext, Object rowValue, ResultMapping parentMapping, ResultSet rs) throws SQLException {
    if (parentMapping != null) {
      linkToParents(rs, parentMapping, rowValue);
    } else {
      callResultHandler(resultHandler, resultContext, rowValue);
    }
  }
   
  //呼叫回撥
  @SuppressWarnings("unchecked" /* because ResultHandler<?> is always ResultHandler<Object>*/)
  private void callResultHandler(ResultHandler<?> resultHandler, DefaultResultContext<Object> resultContext, Object rowValue) {
    resultContext.nextResultObject(rowValue);
    ((ResultHandler<Object>) resultHandler).handleResult(resultContext);
  }

四、總結與結果對比

流式程式設計使用裸露JDBC程式設計最簡單,靈活,但是sql語句需要分散寫到需要呼叫資料庫操作的地方,不便於維護,Mybatis底層還是使用裸露JDBC程式設計API實現的,並且使用xml檔案統一管理sql語句,雖然解析執行時候會有點開銷(比如每次呼叫都是反射進行的),但是同時還提供了快取。

對於同等條件下搜尋結果為600萬條記錄的時候使用遊標與不使用時候記憶體佔用對比:

  • 非流式

    image.png

    image.png
  • 流式

    貼上圖片.png

    貼上圖片.png

可知非流式時候記憶體會隨著搜出來的記錄增長而近乎直線增長,流式時候則比較平穩,另外非流式由於需要mysql伺服器準備全部資料,所以呼叫後不會馬上返回,需要根據資料量大小不同會等待一段時候才會返回,這時候呼叫方執行緒會阻塞,流式則因為每次返回一條記錄,所以返回速度會很快。

這裡在總結下:client傳送select請求給Server後,Server根據條件篩選符合條件的記錄,然後就會把記錄傳送到自己的傳送buffer,等buffer滿了就flush快取(這裡要注意的是如果client的接受快取滿了,那麼Server的傳送就會阻塞主,直到client的接受快取空閒。),通過網路傳送到client的接受快取,當不用遊標時候MySqIo就會從接受快取裡面逐個讀取記錄到resultset。就這樣client 從自己的接受快取讀取資料到resultset,同時Server端不斷通過網路向client接受快取傳送資料,直到所有記錄都放到了resultset。

如果使用了遊標,則使用者呼叫resultset的next的頻率決定了Server傳送時候的阻塞情況,如果使用者呼叫next快,那麼client的接受快取就會有空閒,那麼Server就會把資料傳送過來,如果使用者呼叫的慢,那麼由於接受快取騰不出來,Server的傳送就會阻塞


加多

加多

高階 Java 攻城獅 at 阿里巴巴加多,目前就職於阿里巴巴,熱衷併發程式設計、ClassLoader,Spring等開源框架,分散式RPC框架dubbo,springcloud等;愛好音樂,運動。微信公眾號:技術原始積累。知識星球賬號:技術原始積累