1. 程式人生 > >使用JDBC中的PreparedStatement批量插入

使用JDBC中的PreparedStatement批量插入

  • 工具類
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import 
java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import javax.sql.DataSource; import org.apache.log4j.Logger; /** * * @Title: J * @ClassName:JdbcUtil.java
* @Description: * * @Copyright 2016-2017 新開普 - Powered By 研發中心 * @author: FLY * @date:2017927下午5:01:29 * @version V1.0 */ public class JdbcUtil { // 定義資料庫的連結 private static Connection conn; // 定義sql語句的執行物件 private static PreparedStatement pstmt; // 定義查詢返回的結果集合 private static ResultSet rs; static
Logger jdbcLog = LoggerUtil.getLogger("gateway", "jdbc");// 資料庫連線日誌 /** * * @Title: 獲取資料庫連線 * @Description: * * @author: FLY * @date:2017927下午5:11:01 */ public static void getConnection() { try { Context ic = new InitialContext(); //此處要新增的是查詢資料來源的名字 但是要加字首java:comp/env DataSource source = (DataSource)ic.lookup("java:comp/env/jdbc/資料庫名"); conn = source.getConnection(); } catch (NamingException e) { jdbcLog.error("【資料來源沒找到!】,異常資訊:"+e); e.printStackTrace(); } catch (SQLException e) { jdbcLog.error("【獲取數連線物件失敗!】,異常資訊:"+e); e.printStackTrace(); } } public static boolean executeBatch(String sql, Object... objs) { int n = 0; try { getConnection(); jdbcLog.info("【進入批量處理】,sql" + sql+",引數:"+objs); // 那麼對於每一條insert語句,都會產生一條log寫入磁碟 conn.setAutoCommit(false); pstmt = conn.prepareStatement(sql); for (int i = 0; i < objs.length; i++) { pstmt.setObject(i + 1, objs[i]); // 1w條記錄插入一次 if (i % 10000 == 0){ pstmt.executeBatch(); conn.commit(); } } // 最後插入不足1w條的資料 int[] executeBatch = pstmt.executeBatch(); conn.commit(); //更新條數 n= executeBatch.length; jdbcLog.warn("【批量處理】,更新條數:"+n); } catch (SQLException e) { e.printStackTrace(); jdbcLog.warn("【批量處理異常】,異常資訊:"+e); } finally { close(conn, pstmt, rs); } return n > 0 ? true : false; } /** * 執行資料庫插入操作 * * @param datas 插入資料表中key為列名和value為列對應的值的Map物件的List集合 * @param tableName 要插入的資料庫的表名 * @return 影響的行數 * @throws SQLException SQL異常 */ public static int executeBatchInsert(String tableName, List<Map<String, Object>> datas) throws SQLException { long startTime=System.currentTimeMillis();//記錄開始時間 jdbcLog.info("【批量插入】,資料表:" + tableName+" ,要插入的資料:"+datas); /**影響的行數**/ int affectRowCount = -1; try { /**從資料庫連線池中獲取資料庫連線**/ getConnection(); /**設定不自動提交,以便於在出現異常的時候資料庫回滾**/ conn.setAutoCommit(false); Map<String, Object> valueMap = datas.get(0); /**獲取資料庫插入的Map的鍵值對的值**/ Set<String> keySet = valueMap.keySet(); Iterator<String> iterator = keySet.iterator(); /**要插入的欄位sql,其實就是用key拼起來的**/ StringBuilder columnSql = new StringBuilder(); /**要插入的欄位值,其實就是?**/ StringBuilder unknownMarkSql = new StringBuilder(); Object[] keys = new Object[valueMap.size()]; // 要執行到資料庫的SQL String sqlStr = ""; int i = 0; while (iterator.hasNext()) { String key = iterator.next(); keys[i] = key; columnSql.append(i == 0 ? "" : ","); columnSql.append(key); unknownMarkSql.append(i == 0 ? "" : ","); unknownMarkSql.append("?"); i++; } /**開始拼插入的sql語句**/ StringBuilder sql = new StringBuilder(); sql.append("INSERT INTO "); sql.append(tableName); sql.append(" ("); sql.append(columnSql); sql.append(" ) VALUES "); /*sql.append(" ) VALUES ("); sql.append(unknownMarkSql); sql.append(" )");*/ // jdbcLog.info("【批量插入】,資料表:" + tableName+" ,SQL"+sql.toString()); // StringBuffer sql = new StringBuffer(); int dataCount = datas.size(); for (int j = 0; j < dataCount; j++) { for (int k = 0; k < keys.length; k++) { // pstmt.setObject(k + 1, datas.get(j).get(keys[k])); /*sql.append(" ("); sql.append(datas.get(j).get(keys[k]));*/ if(k == 0){ sql.append(" ("); sql.append("'").append(datas.get(j).get(keys[k])).append("'"); sql.append(" ,"); }else if(k == keys.length-1){ sql.append("'").append(datas.get(j).get(keys[k])).append("'"); sql.append(" ),"); }else{ sql.append("'").append(datas.get(j).get(keys[k])).append("'"); sql.append(" ,"); } } // 1000個提交一次 if ((j != 0 && j % 1000 == 0) || j == dataCount - 1) { sqlStr = sql.substring(0, sql.length() - 1); // jdbcLog.info("【批量插入】,資料表:" + tableName+" ,最終SQL"+sqlStr); /**執行SQL預編譯**/ pstmt = conn.prepareStatement(""); pstmt.addBatch(sqlStr); } } int[] arr = pstmt.executeBatch(); conn.commit(); long endTime=System.currentTimeMillis();//記錄結束時間 float excTime=(float)(endTime-startTime)/1000; affectRowCount = arr.length; jdbcLog.info("【批量插入】,資料表:" + tableName+" ,最終SQL"+sql.toString()+" ,返回:["+ affectRowCount + "] "+" ,執行時間:["+excTime+"]s"); } catch (Exception e) { e.printStackTrace(); jdbcLog.error("【批量插入異常】,資料表:" + tableName+" ,異常資訊:"+e); if (conn != null) { conn.rollback(); } // throw e; } finally { close(conn, pstmt, rs); } return affectRowCount; } /** * 執行資料庫插入操作 * * @param datas 插入資料表中key為列名和value為列對應的值的Map物件的List集合 * @param tableName 要插入的資料庫的表名 * @return 影響的行數 * @throws SQLException SQL異常 */ public static int batchInsert(String tableName, List<Map<String, Object>> datas) throws SQLException { jdbcLog.info("【批量插入】,資料表:" + tableName+" ,要插入的資料:"+datas); /**影響的行數**/ int affectRowCount = -1; try { /**從資料庫連線池中獲取資料庫連線**/ getConnection(); conn.setAutoCommit(false); Map<String, Object> valueMap = datas.get(0); /**獲取資料庫插入的Map的鍵值對的值**/ Set<String> keySet = valueMap.keySet(); Iterator<String> iterator = keySet.iterator(); /**要插入的欄位sql,其實就是用key拼起來的**/ StringBuilder columnSql = new StringBuilder(); /**要插入的欄位值,其實就是?**/ StringBuilder unknownMarkSql = new StringBuilder(); Object[] keys = new Object[valueMap.size()]; int i = 0; while (iterator.hasNext()) { String key = iterator.next(); keys[i] = key; columnSql.append(i == 0 ? "" : ","); columnSql.append(key); unknownMarkSql.append(i == 0 ? "" : ","); unknownMarkSql.append("?"); i++; } /**開始拼插入的sql語句**/ StringBuilder sql = new StringBuilder(); sql.append("INSERT INTO "); sql.append(tableName); sql.append(" ("); sql.append(columnSql); sql.append(" ) VALUES ("); sql.append(unknownMarkSql); sql.append(" )"); /**執行SQL預編譯**/ pstmt = conn.prepareStatement(sql.toString()); /**設定不自動提交,以便於在出現異常的時候資料庫回滾**/ // conn.setAutoCommit(false); jdbcLog.info("【批量插入】,資料表:" + tableName+" ,SQL"+sql.toString()); for (int j = 0; j < datas.size(); j++) { for (int k = 0; k < keys.length; k++) { pstmt.setObject(k + 1, datas.get(j).get(keys[k])); } pstmt.addBatch(); } int[] arr = pstmt.executeBatch(); jdbcLog.info("【批量插入】,資料表:" + tableName+" ,最終SQL["+ pstmt.toString() + "] "); conn.commit(); affectRowCount = arr.length; jdbcLog.info("【批量插入】,資料表:" + tableName+" ,成功了插入了:["+ affectRowCount + "] "); } catch (Exception e) { e.printStackTrace(); jdbcLog.error("【批量插入異常】,資料表:" + tableName+" ,異常資訊:"+e); if (conn != null) { conn.rollback(); } // throw e; } finally { close(conn, pstmt, rs); } return affectRowCount; } /** * * @Title: 批量更新 * @param sql * @param objs * @return boolean * @Description: * * @author: FLY * @date:2017927下午4:40:38 */ public static boolean executeUpdate(String sql, Object... objs) { int n = 0; try { getConnection(); jdbcLog.info("【進入批量更新】,sql" + sql+",引數:"+objs); pstmt = conn.prepareStatement(sql); for (int i = 0; i < objs.length; i++) { pstmt.setObject(i + 1, objs[i]); } n = pstmt.executeUpdate(); jdbcLog.warn("【批量更新】,更新條數:"+n); } catch (SQLException e) { e.printStackTrace(); jdbcLog.warn("【批量更新異常】,異常資訊:"+e); } finally { close(conn, pstmt, rs); } return n > 0 ? true : false; } /** * * @Title: 查詢 * @param sql * @param objs * @return int * @Description: * * @author: FLY * @date:2017927下午4:44:57 */ public static int queryForInt(String sql, Object... objs) { try { getConnection(); pstmt = conn.prepareStatement(sql); for (int i = 0; i < objs.length; i++) { pstmt.setObject(i + 1, objs[i]); } rs = pstmt.executeQuery(); if (rs.next()) return rs.getInt(1); } catch (SQLException e) { e.printStackTrace(); } finally { close(conn, pstmt, rs); } return 0; } /** * * @Title: 釋放資料庫連線 * @param conn * @param stmt * @param rs void * @Description: * * @author: FLY * @date:2017927下午4:44:31 */ public static void close(Connection conn, Statement stmt, ResultSet rs) { if (rs != null){ try { rs.close(); } catch (Exception e) { e.printStackTrace(); jdbcLog.error("【釋放返回結果集異常】,異常資訊:"+e); } } if (stmt != null){ try { stmt.close(); } catch (Exception e) { e.printStackTrace(); jdbcLog.error("【釋放sql語句的執行物件異常】,異常資訊:"+e); } } if (conn != null){ try { conn.close(); } catch (Exception e) { e.printStackTrace(); jdbcLog.error("【釋放資料庫連線異常】,異常資訊:"+e); } } } }
  • 在資料庫連線語句上少了啟用rewriteBatchedStatements引數,可以再提高插入速度
jdbc:mysql://IP:PORT/database?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&rewriteBatchedStatements=true