聊聊jdbc statement的fetchSize
在使用MySQL的JDBC時,如果查詢結果集過大,使用一次查詢,可能會出現Java.lang.OutOfMemoryError: Java heap space問題,因為DB伺服器端一次將查詢到的結果集全部發送到Java端儲存在記憶體中而造成OOM。
MySQL JDBC需要一條SQL從資料庫讀取大量資料,而不發生JVM OOM,可以採用以下方法之一:
1、當statement設定以下屬性時,採用的是流資料接收方式,每次只從伺服器接收部份資料,直到所有資料處理完畢,不會發生JVM OOM。
1 setResultSetType(ResultSet.TYPE_FORWARD_ONLY);2 setFetchSize(Integer.MIN_VALUE);
2、呼叫statement的enableStreamingResults方法,實際上enableStreamingResults方法內部封裝的就是第1種方式。
3、設定連線屬性useCursorFetch=true (5.0版驅動開始支援),statement以TYPE_FORWARD_ONLY開啟,再設定fetch size引數,表示採用伺服器端遊標,每次從伺服器取fetch_size條資料。
故採用如下方式就可以解決OOM問題:
1 ps = (PreparedStatement) con.prepareStatement("select * from bigTable",2 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); 3 ps.setFetchSize(Integer.MIN_VALUE); 4 ps.setFetchDirection(ResultSet.FETCH_REVERSE);
-------------------------------------------------------------------------------------------------
在Statement和ResultSet介面中都有setFetchSize方法
1 void setFetchSize(int rows) throws SQLException
檢視API文件
Statement介面中是這樣解釋的:
為JDBC 驅動程式提供一個提示,它提示此Statement
生成的ResultSet
物件需要更多行時應該從資料庫獲取的行數。指定的行數僅影響使用此語句建立的結果集合。如果指定的值為 0,則忽略該提示。預設值為 0。
ResultSet中是這樣解釋的:
為 JDBC 驅動程式設定此ResultSet
物件需要更多行時應該從資料庫獲取的行數。如果指定的獲取大小為零,則 JDBC 驅動程式忽略該值,隨意對獲取大小作出它自己的最佳猜測。預設值由建立結果集的Statement
物件設定。獲取大小可以在任何時間更改。
網上有下面這樣的一段摘錄1:
預設時,驅動程式一次從查詢裡獲取所有的結果。這樣可能對於大的資料集來說是不方便的, 因此 JDBC 驅動提供了一個用於設定從一個數據庫遊標抽取若干行的 ResultSet 的方法。在連線的客戶端這邊緩衝了一小部分資料行,並且在用盡之後, 則通過重定位遊標檢索下一個資料行塊。
摘錄2:
setFetchSize 最主要是為了減少網路互動次數設計的。訪問ResultSet時,如果它每次只從伺服器上取一行資料,則會產生大量的開銷。setFetchSize的意 思是當呼叫rs.next時,ResultSet會一次性從伺服器上取得多少行資料回來,這樣在下次rs.next時,它可以直接從記憶體中獲取出資料而不 需要網路互動,提高了效率。 這個設定可能會被某些JDBC驅動忽略的,而且設定過大也會造成記憶體的上升。
------------------------------------------------------------------------------------------------------------
原始碼分析:
fetchSize
這裡以postgres jdbc driver為例,主要是因為postgres的jdbc driver有公開原始碼,而且命名比較規範。之前看oracle jdbc,由於沒有原始碼,反編譯出來一大堆var1,var2等的變數命名,非常晦澀。預設情況下pgjdbc driver會一次性拉取所有結果集,也就是在executeQuery的時候。對於大資料量的查詢來說,非常容易造成OOM。這種場景就需要設定fetchSize,執行query的時候先返回第一批資料,之後next完一批資料之後再去拉取下一批。
但是這個有幾個要求:
- 資料庫必須使用V3協議,即pg7.4+
- connection的autoCommit必須為false,因為開啟autoCommit的話,查詢完成cursor會被關閉,那麼下次就不能再fetch了。另外ResultSet必須是ResultSet.TYPE_FORWARD_ONLY型別,這個是預設的。也就是說無法向後滾動。
- 查詢語句必須是單條,不能是用分號組成的多條查詢
例項程式碼
1 @Test 2 public void testReadTimeout() throws SQLException { 3 Connection connection = dataSource.getConnection(); 4 //https://jdbc.postgresql.org/documentation/head/query.html 5 connection.setAutoCommit(false); //NOTE 為了設定fetchSize,必須設定為false 6 7 String sql = "select * from demo_table"; 8 PreparedStatement pstmt; 9 try { 10 pstmt = (PreparedStatement)connection.prepareStatement(sql); 11 pstmt.setFetchSize(50); 12 System.out.println("ps.getQueryTimeout():" + pstmt.getQueryTimeout()); 13 System.out.println("ps.getFetchSize():" + pstmt.getFetchSize()); 14 System.out.println("ps.getFetchDirection():" + pstmt.getFetchDirection()); 15 System.out.println("ps.getMaxFieldSize():" + pstmt.getMaxFieldSize()); 16 17 ResultSet rs = pstmt.executeQuery(); 18 //NOTE 這裡返回了就代表statement執行完成,預設返回fetchSize的資料 19 int col = rs.getMetaData().getColumnCount(); 20 System.out.println("============================"); 21 while (rs.next()) { 22 for (int i = 1; i <= col; i++) { 23 System.out.print(rs.getObject(i)); 24 } 25 System.out.println(""); 26 } 27 System.out.println("============================"); 28 } catch (SQLException e) { 29 e.printStackTrace(); 30 } finally { 31 //close resources 32 } 33 }
原始碼解析
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgPreparedStatement.java
1 /* 2 * A Prepared SQL query is executed and its ResultSet is returned 3 * 4 * @return a ResultSet that contains the data produced by the * query - never null 5 * 6 * @exception SQLException if a database access error occurs 7 */ 8 public java.sql.ResultSet executeQuery() throws SQLException { 9 if (!executeWithFlags(0)) { 10 throw new PSQLException(GT.tr("No results were returned by the query."), PSQLState.NO_DATA); 11 } 12 13 if (result.getNext() != null) { 14 throw new PSQLException(GT.tr("Multiple ResultSets were returned by the query."), 15 PSQLState.TOO_MANY_RESULTS); 16 } 17 18 return result.getResultSet(); 19 } 20 executeQuery首先呼叫executeWithFlags方法,原始碼裡頭直接寫在if裡頭的,這個不是推薦的方式,因為放在if比較容易忽略。 21 executeWithFlags 22 public boolean executeWithFlags(int flags) throws SQLException { 23 try { 24 checkClosed(); 25 26 if (connection.getPreferQueryMode() == PreferQueryMode.SIMPLE) { 27 flags |= QueryExecutor.QUERY_EXECUTE_AS_SIMPLE; 28 } 29 30 execute(preparedQuery, preparedParameters, flags); 31 32 return (result != null && result.getResultSet() != null); 33 } finally { 34 defaultTimeZone = null; 35 } 36 } 37 38 protected final void execute(CachedQuery cachedQuery, ParameterList queryParameters, int flags) 39 throws SQLException { 40 try { 41 executeInternal(cachedQuery, queryParameters, flags); 42 } catch (SQLException e) { 43 // Don't retry composite queries as it might get partially executed 44 if (cachedQuery.query.getSubqueries() != null 45 || !connection.getQueryExecutor().willHealOnRetry(e)) { 46 throw e; 47 } 48 cachedQuery.query.close(); 49 // Execute the query one more time 50 executeInternal(cachedQuery, queryParameters, flags); 51 } 52 }
這裡又呼叫execute方法,在呼叫executeInternal
executeInternal
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgPreparedStatement.java
1 private void executeInternal(CachedQuery cachedQuery, ParameterList queryParameters, int flags) 2 throws SQLException { 3 closeForNextExecution(); 4 5 // Enable cursor-based resultset if possible. 6 if (fetchSize > 0 && !wantsScrollableResultSet() && !connection.getAutoCommit() 7 && !wantsHoldableResultSet()) { 8 flags |= QueryExecutor.QUERY_FORWARD_CURSOR; 9 } 10 11 if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) { 12 flags |= QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS; 13 14 // If the no results flag is set (from executeUpdate) 15 // clear it so we get the generated keys results. 16 // 17 if ((flags & QueryExecutor.QUERY_NO_RESULTS) != 0) { 18 flags &= ~(QueryExecutor.QUERY_NO_RESULTS); 19 } 20 } 21 22 if (isOneShotQuery(cachedQuery)) { 23 flags |= QueryExecutor.QUERY_ONESHOT; 24 } 25 // Only use named statements after we hit the threshold. Note that only 26 // named statements can be transferred in binary format. 27 28 if (connection.getAutoCommit()) { 29 flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; 30 } 31 32 // updateable result sets do not yet support binary updates 33 if (concurrency != ResultSet.CONCUR_READ_ONLY) { 34 flags |= QueryExecutor.QUERY_NO_BINARY_TRANSFER; 35 } 36 37 Query queryToExecute = cachedQuery.query; 38 39 if (queryToExecute.isEmpty()) { 40 flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; 41 } 42 43 if (!queryToExecute.isStatementDescribed() && forceBinaryTransfers 44 && (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) == 0) { 45 // Simple 'Q' execution does not need to know parameter types 46 // When binaryTransfer is forced, then we need to know resulting parameter and column types, 47 // thus sending a describe request. 48 int flags2 = flags | QueryExecutor.QUERY_DESCRIBE_ONLY; 49 StatementResultHandler handler2 = new StatementResultHandler(); 50 connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler2, 0, 0, 51 flags2); 52 ResultWrapper result2 = handler2.getResults(); 53 if (result2 != null) { 54 result2.getResultSet().close(); 55 } 56 } 57 58 StatementResultHandler handler = new StatementResultHandler(); 59 result = null; 60 try { 61 startTimer(); 62 connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows, 63 fetchSize, flags); 64 } finally { 65 killTimerTask(); 66 } 67 result = firstUnclosedResult = handler.getResults(); 68 69 if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) { 70 generatedKeys = result; 71 result = result.getNext(); 72 73 if (wantsGeneratedKeysOnce) { 74 wantsGeneratedKeysOnce = false; 75 } 76 } 77 78 } 79 主要看這段 80 connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows, 81 fetchSize, flags); 82 通過把fetchSize傳遞進去,拉取指定大小的result 83 最後呼叫sendExecute以及processResults方法來拉取資料 84 postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/core/v3/QueryExecutorImpl.java 85 86 private void sendExecute(SimpleQuery query, Portal portal, int limit) throws IOException { 87 // 88 // Send Execute. 89 // 90 91 if (logger.logDebug()) { 92 logger.debug(" FE=> Execute(portal=" + portal + ",limit=" + limit + ")"); 93 } 94 95 byte[] encodedPortalName = (portal == null ? null : portal.getEncodedPortalName()); 96 int encodedSize = (encodedPortalName == null ? 0 : encodedPortalName.length); 97 98 // Total size = 4 (size field) + 1 + N (source portal) + 4 (max rows) 99 pgStream.sendChar('E'); // Execute 100 pgStream.sendInteger4(4 + 1 + encodedSize + 4); // message size 101 if (encodedPortalName != null) { 102 pgStream.send(encodedPortalName); // portal name 103 } 104 pgStream.sendChar(0); // portal name terminator 105 pgStream.sendInteger4(limit); // row limit 106 107 pendingExecuteQueue.add(new ExecuteRequest(query, portal, false)); 108 } 109 110 protected void processResults(ResultHandler handler, int flags) throws IOException { 111 boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0; 112 boolean bothRowsAndStatus = (flags & QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS) != 0; 113 114 List<byte[][]> tuples = null; 115 116 int c; 117 boolean endQuery = false; 118 119 // At the end of a command execution we have the CommandComplete 120 // message to tell us we're done, but with a describeOnly command 121 // we have no real flag to let us know we're done. We've got to 122 // look for the next RowDescription or NoData message and return 123 // from there. 124 boolean doneAfterRowDescNoData = false; 125 126 while (!endQuery) { 127 c = pgStream.receiveChar(); 128 switch (c) { 129 case 'A': // Asynchronous Notify 130 receiveAsyncNotify(); 131 break; 132 133 case '1': // Parse Complete (response to Parse) 134 pgStream.receiveInteger4(); // len, discarded 135 136 SimpleQuery parsedQuery = pendingParseQueue.removeFirst(); 137 String parsedStatementName = parsedQuery.getStatementName(); 138 //... 139 } 140 } 141 } 142 next 143 postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgResultSet.java 144 145 public boolean next() throws SQLException { 146 checkClosed(); 147 if (onInsertRow) { 148 throw new PSQLException(GT.tr("Can''t use relative move methods while on the insert row."), 149 PSQLState.INVALID_CURSOR_STATE); 150 } 151 if (current_row + 1 >= rows.size()) { 152 if (cursor == null || (maxRows > 0 && row_offset + rows.size() >= maxRows)) { 153 current_row = rows.size(); 154 this_row = null; 155 rowBuffer = null; 156 return false; // End of the resultset. 157 } 158 159 // Ask for some more data. 160 row_offset += rows.size(); // We are discarding some data. 161 162 int fetchRows = fetchSize; 163 if (maxRows != 0) { 164 if (fetchRows == 0 || row_offset + fetchRows > maxRows) { 165 // Fetch would exceed maxRows, limit it. 166 fetchRows = maxRows - row_offset; 167 } 168 } 169 // Execute the fetch and update this resultset. 170 connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows); 171 current_row = 0; 172 173 // Test the new rows array. 174 if (rows.isEmpty()) { 175 this_row = null; 176 rowBuffer = null; 177 return false; 178 } 179 } else { 180 current_row++; 181 } 182 initRowBuffer(); 183 return true; 184 }
next方法可以看到,首先判斷current_row + 1是否小於rows.size(),小於的話,那就current_row++;否則表示這一批fetchSize的資料被消費完了,需要判斷是否結束或者拉取下一批資料,之後更新current_row
1 connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows);
這個方法拉取fetchRows條數的下一批資料
- initRowBuffer
1 private void initRowBuffer() { 2 this_row = rows.get(current_row); 3 // We only need a copy of the current row if we're going to 4 // modify it via an updatable resultset. 5 if (resultsetconcurrency == ResultSet.CONCUR_UPDATABLE) { 6 rowBuffer = new byte[this_row.length][]; 7 System.arraycopy(this_row, 0, rowBuffer, 0, this_row.length); 8 } else { 9 rowBuffer = null; 10 } 11 }
這就是next移動之後,把要消費的這行資料放到rowBuffer裡頭。
小結
對於查詢資料量大的場景下,非常有必要設定fetchSize,否則全量拉取很容易OOM,但是使用fetchSize的時候,要求資料能夠在遍歷resultSet的時候及時處理,而不是收集完所有資料返回回去再去處理。