1. 程式人生 > >聊聊jdbc statement的fetchSize

聊聊jdbc statement的fetchSize

在使用MySQL的JDBC時,如果查詢結果集過大,使用一次查詢,可能會出現Java.lang.OutOfMemoryError: Java heap space問題,因為DB伺服器端一次將查詢到的結果集全部發送到Java端儲存在記憶體中而造成OOM。

MySQL JDBC需要一條SQL從資料庫讀取大量資料,而不發生JVM OOM,可以採用以下方法之一:
    1、當statement設定以下屬性時,採用的是流資料接收方式,每次只從伺服器接收部份資料,直到所有資料處理完畢,不會發生JVM OOM。

1 setResultSetType(ResultSet.TYPE_FORWARD_ONLY);
2 setFetchSize(Integer.MIN_VALUE);

    2、呼叫statement的enableStreamingResults方法,實際上enableStreamingResults方法內部封裝的就是第1種方式。

    3、設定連線屬性useCursorFetch=true (5.0版驅動開始支援),statement以TYPE_FORWARD_ONLY開啟,再設定fetch size引數,表示採用伺服器端遊標,每次從伺服器取fetch_size條資料。

故採用如下方式就可以解決OOM問題:

1 ps = (PreparedStatement) con.prepareStatement("select * from bigTable",
2 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); 3 ps.setFetchSize(Integer.MIN_VALUE); 4 ps.setFetchDirection(ResultSet.FETCH_REVERSE);

 

-------------------------------------------------------------------------------------------------

 在Statement和ResultSet介面中都有setFetchSize方法

1 void setFetchSize(int rows) throws SQLException

檢視API文件

Statement介面中是這樣解釋的:

為JDBC 驅動程式提供一個提示,它提示此Statement 生成的ResultSet 物件需要更多行時應該從資料庫獲取的行數。指定的行數僅影響使用此語句建立的結果集合。如果指定的值為 0,則忽略該提示。預設值為 0。

ResultSet中是這樣解釋的:

為 JDBC 驅動程式設定此ResultSet 物件需要更多行時應該從資料庫獲取的行數。如果指定的獲取大小為零,則 JDBC 驅動程式忽略該值,隨意對獲取大小作出它自己的最佳猜測。預設值由建立結果集的Statement 物件設定。獲取大小可以在任何時間更改。

網上有下面這樣的一段摘錄1:

預設時,驅動程式一次從查詢裡獲取所有的結果。這樣可能對於大的資料集來說是不方便的, 因此 JDBC 驅動提供了一個用於設定從一個數據庫遊標抽取若干行的 ResultSet 的方法。在連線的客戶端這邊緩衝了一小部分資料行,並且在用盡之後, 則通過重定位遊標檢索下一個資料行塊。

摘錄2:

setFetchSize 最主要是為了減少網路互動次數設計的。訪問ResultSet時,如果它每次只從伺服器上取一行資料,則會產生大量的開銷。setFetchSize的意 思是當呼叫rs.next時,ResultSet會一次性從伺服器上取得多少行資料回來,這樣在下次rs.next時,它可以直接從記憶體中獲取出資料而不 需要網路互動,提高了效率。 這個設定可能會被某些JDBC驅動忽略的,而且設定過大也會造成記憶體的上升。

------------------------------------------------------------------------------------------------------------

原始碼分析:

fetchSize

這裡以postgres jdbc driver為例,主要是因為postgres的jdbc driver有公開原始碼,而且命名比較規範。之前看oracle jdbc,由於沒有原始碼,反編譯出來一大堆var1,var2等的變數命名,非常晦澀。

預設情況下pgjdbc driver會一次性拉取所有結果集,也就是在executeQuery的時候。對於大資料量的查詢來說,非常容易造成OOM。這種場景就需要設定fetchSize,執行query的時候先返回第一批資料,之後next完一批資料之後再去拉取下一批。

但是這個有幾個要求:

  • 資料庫必須使用V3協議,即pg7.4+
  • connection的autoCommit必須為false,因為開啟autoCommit的話,查詢完成cursor會被關閉,那麼下次就不能再fetch了。另外ResultSet必須是ResultSet.TYPE_FORWARD_ONLY型別,這個是預設的。也就是說無法向後滾動。
  • 查詢語句必須是單條,不能是用分號組成的多條查詢

例項程式碼

 1  @Test
 2     public void testReadTimeout() throws SQLException {
 3         Connection connection = dataSource.getConnection();
 4         //https://jdbc.postgresql.org/documentation/head/query.html
 5         connection.setAutoCommit(false); //NOTE 為了設定fetchSize,必須設定為false
 6 
 7         String sql = "select * from demo_table";
 8         PreparedStatement pstmt;
 9         try {
10             pstmt = (PreparedStatement)connection.prepareStatement(sql);
11             pstmt.setFetchSize(50); 
12             System.out.println("ps.getQueryTimeout():" + pstmt.getQueryTimeout());
13             System.out.println("ps.getFetchSize():" + pstmt.getFetchSize());
14             System.out.println("ps.getFetchDirection():" + pstmt.getFetchDirection());
15             System.out.println("ps.getMaxFieldSize():" + pstmt.getMaxFieldSize());
16 
17             ResultSet rs = pstmt.executeQuery(); 
18             //NOTE 這裡返回了就代表statement執行完成,預設返回fetchSize的資料
19             int col = rs.getMetaData().getColumnCount();
20             System.out.println("============================");
21             while (rs.next()) { 
22                 for (int i = 1; i <= col; i++) {
23                     System.out.print(rs.getObject(i));
24                 }
25                 System.out.println("");
26             }
27             System.out.println("============================");
28         } catch (SQLException e) {
29             e.printStackTrace();
30         } finally {
31             //close resources
32         }
33     }

原始碼解析

postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgPreparedStatement.java

 1 /*
 2    * A Prepared SQL query is executed and its ResultSet is returned
 3    *
 4    * @return a ResultSet that contains the data produced by the * query - never null
 5    *
 6    * @exception SQLException if a database access error occurs
 7    */
 8   public java.sql.ResultSet executeQuery() throws SQLException {
 9     if (!executeWithFlags(0)) {
10       throw new PSQLException(GT.tr("No results were returned by the query."), PSQLState.NO_DATA);
11     }
12 
13     if (result.getNext() != null) {
14       throw new PSQLException(GT.tr("Multiple ResultSets were returned by the query."),
15           PSQLState.TOO_MANY_RESULTS);
16     }
17 
18     return result.getResultSet();
19   }
20 executeQuery首先呼叫executeWithFlags方法,原始碼裡頭直接寫在if裡頭的,這個不是推薦的方式,因為放在if比較容易忽略。
21 executeWithFlags
22 public boolean executeWithFlags(int flags) throws SQLException {
23     try {
24       checkClosed();
25 
26       if (connection.getPreferQueryMode() == PreferQueryMode.SIMPLE) {
27         flags |= QueryExecutor.QUERY_EXECUTE_AS_SIMPLE;
28       }
29 
30       execute(preparedQuery, preparedParameters, flags);
31 
32       return (result != null && result.getResultSet() != null);
33     } finally {
34       defaultTimeZone = null;
35     }
36   }
37 
38 protected final void execute(CachedQuery cachedQuery, ParameterList queryParameters, int flags)
39       throws SQLException {
40     try {
41       executeInternal(cachedQuery, queryParameters, flags);
42     } catch (SQLException e) {
43       // Don't retry composite queries as it might get partially executed
44       if (cachedQuery.query.getSubqueries() != null
45           || !connection.getQueryExecutor().willHealOnRetry(e)) {
46         throw e;
47       }
48       cachedQuery.query.close();
49       // Execute the query one more time
50       executeInternal(cachedQuery, queryParameters, flags);
51     }
52   }

 

這裡又呼叫execute方法,在呼叫executeInternal

executeInternal

postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgPreparedStatement.java

  1 private void executeInternal(CachedQuery cachedQuery, ParameterList queryParameters, int flags)
  2       throws SQLException {
  3     closeForNextExecution();
  4 
  5     // Enable cursor-based resultset if possible.
  6     if (fetchSize > 0 && !wantsScrollableResultSet() && !connection.getAutoCommit()
  7         && !wantsHoldableResultSet()) {
  8       flags |= QueryExecutor.QUERY_FORWARD_CURSOR;
  9     }
 10 
 11     if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) {
 12       flags |= QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS;
 13 
 14       // If the no results flag is set (from executeUpdate)
 15       // clear it so we get the generated keys results.
 16       //
 17       if ((flags & QueryExecutor.QUERY_NO_RESULTS) != 0) {
 18         flags &= ~(QueryExecutor.QUERY_NO_RESULTS);
 19       }
 20     }
 21 
 22     if (isOneShotQuery(cachedQuery)) {
 23       flags |= QueryExecutor.QUERY_ONESHOT;
 24     }
 25     // Only use named statements after we hit the threshold. Note that only
 26     // named statements can be transferred in binary format.
 27 
 28     if (connection.getAutoCommit()) {
 29       flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;
 30     }
 31 
 32     // updateable result sets do not yet support binary updates
 33     if (concurrency != ResultSet.CONCUR_READ_ONLY) {
 34       flags |= QueryExecutor.QUERY_NO_BINARY_TRANSFER;
 35     }
 36 
 37     Query queryToExecute = cachedQuery.query;
 38 
 39     if (queryToExecute.isEmpty()) {
 40       flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;
 41     }
 42 
 43     if (!queryToExecute.isStatementDescribed() && forceBinaryTransfers
 44         && (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) == 0) {
 45       // Simple 'Q' execution does not need to know parameter types
 46       // When binaryTransfer is forced, then we need to know resulting parameter and column types,
 47       // thus sending a describe request.
 48       int flags2 = flags | QueryExecutor.QUERY_DESCRIBE_ONLY;
 49       StatementResultHandler handler2 = new StatementResultHandler();
 50       connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler2, 0, 0,
 51           flags2);
 52       ResultWrapper result2 = handler2.getResults();
 53       if (result2 != null) {
 54         result2.getResultSet().close();
 55       }
 56     }
 57 
 58     StatementResultHandler handler = new StatementResultHandler();
 59     result = null;
 60     try {
 61       startTimer();
 62       connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows,
 63           fetchSize, flags);
 64     } finally {
 65       killTimerTask();
 66     }
 67     result = firstUnclosedResult = handler.getResults();
 68 
 69     if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) {
 70       generatedKeys = result;
 71       result = result.getNext();
 72 
 73       if (wantsGeneratedKeysOnce) {
 74         wantsGeneratedKeysOnce = false;
 75       }
 76     }
 77 
 78   }
 79 主要看這段
 80 connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows,
 81           fetchSize, flags);
 82 通過把fetchSize傳遞進去,拉取指定大小的result
 83 最後呼叫sendExecute以及processResults方法來拉取資料
 84 postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/core/v3/QueryExecutorImpl.java
 85 
 86 private void sendExecute(SimpleQuery query, Portal portal, int limit) throws IOException {
 87     //
 88     // Send Execute.
 89     //
 90 
 91     if (logger.logDebug()) {
 92       logger.debug(" FE=> Execute(portal=" + portal + ",limit=" + limit + ")");
 93     }
 94 
 95     byte[] encodedPortalName = (portal == null ? null : portal.getEncodedPortalName());
 96     int encodedSize = (encodedPortalName == null ? 0 : encodedPortalName.length);
 97 
 98     // Total size = 4 (size field) + 1 + N (source portal) + 4 (max rows)
 99     pgStream.sendChar('E'); // Execute
100     pgStream.sendInteger4(4 + 1 + encodedSize + 4); // message size
101     if (encodedPortalName != null) {
102       pgStream.send(encodedPortalName); // portal name
103     }
104     pgStream.sendChar(0); // portal name terminator
105     pgStream.sendInteger4(limit); // row limit
106 
107     pendingExecuteQueue.add(new ExecuteRequest(query, portal, false));
108   }
109 
110 protected void processResults(ResultHandler handler, int flags) throws IOException {
111     boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0;
112     boolean bothRowsAndStatus = (flags & QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS) != 0;
113 
114     List<byte[][]> tuples = null;
115 
116     int c;
117     boolean endQuery = false;
118 
119     // At the end of a command execution we have the CommandComplete
120     // message to tell us we're done, but with a describeOnly command
121     // we have no real flag to let us know we're done. We've got to
122     // look for the next RowDescription or NoData message and return
123     // from there.
124     boolean doneAfterRowDescNoData = false;
125 
126     while (!endQuery) {
127       c = pgStream.receiveChar();
128       switch (c) {
129         case 'A': // Asynchronous Notify
130           receiveAsyncNotify();
131           break;
132 
133         case '1': // Parse Complete (response to Parse)
134           pgStream.receiveInteger4(); // len, discarded
135 
136           SimpleQuery parsedQuery = pendingParseQueue.removeFirst();
137           String parsedStatementName = parsedQuery.getStatementName();
138           //...
139       }
140   }
141 }        
142 next
143 postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgResultSet.java
144 
145 public boolean next() throws SQLException {
146     checkClosed();
147     if (onInsertRow) {
148       throw new PSQLException(GT.tr("Can''t use relative move methods while on the insert row."),
149           PSQLState.INVALID_CURSOR_STATE);
150     }
151     if (current_row + 1 >= rows.size()) {
152       if (cursor == null || (maxRows > 0 && row_offset + rows.size() >= maxRows)) {
153         current_row = rows.size();
154         this_row = null;
155         rowBuffer = null;
156         return false; // End of the resultset.
157       }
158 
159       // Ask for some more data.
160       row_offset += rows.size(); // We are discarding some data.
161 
162       int fetchRows = fetchSize;
163       if (maxRows != 0) {
164         if (fetchRows == 0 || row_offset + fetchRows > maxRows) {
165           // Fetch would exceed maxRows, limit it.
166           fetchRows = maxRows - row_offset;
167         }
168       }
169       // Execute the fetch and update this resultset.
170       connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows);
171       current_row = 0;
172 
173       // Test the new rows array.
174       if (rows.isEmpty()) {
175         this_row = null;
176         rowBuffer = null;
177         return false;
178       }
179     } else {
180       current_row++;
181     }
182     initRowBuffer();
183     return true;
184   }

 

next方法可以看到,首先判斷current_row + 1是否小於rows.size(),小於的話,那就current_row++;否則表示這一批fetchSize的資料被消費完了,需要判斷是否結束或者拉取下一批資料,之後更新current_row
1 connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows);
這個方法拉取fetchRows條數的下一批資料
  • initRowBuffer
 1 private void initRowBuffer() {
 2     this_row = rows.get(current_row);
 3     // We only need a copy of the current row if we're going to
 4     // modify it via an updatable resultset.
 5     if (resultsetconcurrency == ResultSet.CONCUR_UPDATABLE) {
 6       rowBuffer = new byte[this_row.length][];
 7       System.arraycopy(this_row, 0, rowBuffer, 0, this_row.length);
 8     } else {
 9       rowBuffer = null;
10     }
11   }

 

這就是next移動之後,把要消費的這行資料放到rowBuffer裡頭。

小結

對於查詢資料量大的場景下,非常有必要設定fetchSize,否則全量拉取很容易OOM,但是使用fetchSize的時候,要求資料能夠在遍歷resultSet的時候及時處理,而不是收集完所有資料返回回去再去處理。