如何快速向資料庫加入10萬條資料
阿新 • • 發佈:2019-02-03
1、程式連線資料庫,使用c3p0執行緒池;
2、程式使用執行緒池,多執行緒程式設計;
3、採用Fork/Join框架執行緒池(工作竊取(work-stealing)演算法),更高效的多執行緒程式設計演算法。
直接貼程式碼,程式碼舉例中,近用小規模資料模擬大資料下的資料庫批量插入操作。
1、資料庫連線池
package com.example.jdbcConnection; import com.mchange.v2.c3p0.ComboPooledDataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; /** * Created by Liuxd on 2018/8/19. */ public class TestC3p0 { private static Connection conn; private static ComboPooledDataSource dataSource; static { try { //獲得c3p0連線池物件 dataSource = new ComboPooledDataSource(); dataSource.setUser("root"); dataSource.setPassword("root"); dataSource.setJdbcUrl("jdbc:mysql://127.0.0.1:3306/foo?autoReconnect=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2b8&useSSL=false"); dataSource.setDriverClass("com.mysql.jdbc.Driver"); dataSource.setInitialPoolSize(2);//初始化池大小 dataSource.setMaxIdleTime(30);//最大空閒時間 dataSource.setMaxPoolSize(20);//最多連線數 dataSource.setMinPoolSize(2);//最少連線數 dataSource.setMaxStatements(50);//每次最多可以執行多少個批處理語句 } catch (Exception e) { e.printStackTrace(); } } /** * 查詢 */ private static List<Object[]> query() { List<Object[]> list = new ArrayList<Object[]>(); try { // 獲取資料庫連線 conn = dataSource.getConnection(); // 查詢sql String sql = "select * from user"; // 讀取資料 PreparedStatement preparedStatement = conn.prepareStatement(sql); //結果集 ResultSet resultSet = preparedStatement.executeQuery(); while (resultSet.next()) { int uid = resultSet.getInt("uid"); String name = resultSet.getString("name"); Integer age = resultSet.getInt("age"); String phone = resultSet.getString("phone"); String passwd = resultSet.getString("passwd"); Object[] objects = new Object[]{uid, name, age, phone, passwd}; list.add(objects); } resultSet.close(); preparedStatement.close(); //Connection連線物件歸還資料庫連線池 conn.close(); } catch (Exception e) { e.printStackTrace(); } return list; } /** * 新增 */ public static void add(String name, int age, String phone, String passwd) { try { // 獲取資料庫連線 conn = dataSource.getConnection(); String insertSql = "insert into `user` (`name`, `age`, `phone`, `passwd`) values(?,?,?,?)"; PreparedStatement ps = conn.prepareStatement(insertSql); ps.setString(1, name); ps.setInt(2, age); ps.setString(3, phone); ps.setString(4, passwd); int row = ps.executeUpdate(); System.out.println("新增結果: " + row); ps.close(); //Connection連線物件歸還資料庫連線池 conn.close(); } catch (Exception e) { e.printStackTrace(); } } /** * 修改 */ private static void update(int uid, String name, int age, String phone, String passwd) { try { // 獲取資料庫連線 conn = dataSource.getConnection(); String updateSql = "UPDATE USER t SET t.name=? ,t.age=?,t.phone=?,t.passwd=? WHERE t.uid=?"; PreparedStatement preparedStatement = conn.prepareStatement(updateSql); preparedStatement.setString(1, name); preparedStatement.setInt(2, age); preparedStatement.setString(3, phone); preparedStatement.setString(4, passwd); preparedStatement.setLong(5, uid); // 執行sql preparedStatement.executeUpdate(); int row = preparedStatement.executeUpdate(); System.out.println("修改結果: " + row); //Connection連線物件歸還資料庫連線池 conn.close(); preparedStatement.close(); } catch (Exception e) { e.printStackTrace(); } } /** * 刪除 */ private static void deleteById(int uid) { try { // 獲取資料庫連線 conn = dataSource.getConnection(); String sql = "delete from USER where uid=?"; PreparedStatement preparedStatement = conn.prepareStatement(sql); preparedStatement.setInt(1, uid); int row = preparedStatement.executeUpdate(); System.out.println("刪除結果: " + row); preparedStatement.close(); //Connection連線物件歸還資料庫連線池 conn.close(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { /** * 1、驗證連線數 */ for (int i = 0; i < 10; i++) { Connection connection = null; try { connection = dataSource.getConnection(); System.out.println(connection.toString()); } catch (SQLException e) { e.printStackTrace(); } finally { if (null != connection) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } } /** * 2、查詢 */ List<Object[]> list = query(); if (null != list && list.size() > 0) { for (int i = 0; i < list.size(); i++) { Object[] objects = list.get(i); for (int j = 0; j < objects.length; j++) { System.out.print(objects[j] + " "); } System.out.println(); } } /** * 3、新增 */ String name = "樂樂"; int age = 17; String phone = "13800138001"; String passwd = "admin123"; add(name, age, phone, passwd); /** * 4、修改 */ update(12, name, age, phone, passwd); /** * 5、刪除 */ deleteById(3); } }
2、Fork/Join框架類
package com.example.jdbcConnection; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; /** * Created by Liuxd on 2018/8/23. */ public class TestForkJoinPool { public static void main(String[] args) throws Exception { System.out.println("*****************************程式開始執行*****************************"); // 建立執行緒池,包含Runtime.getRuntime().availableProcessors()返回值作為個數的並行執行緒的ForkJoinPool ForkJoinPool forkJoinPool = new ForkJoinPool(); TestC3p0 testC3p0 = new TestC3p0(); // 提交可拆分的Task任務 forkJoinPool.submit(new MyTask(0, 1000,testC3p0)); //阻塞當前執行緒直到 ForkJoinPool 中所有的任務都執行結束 forkJoinPool.awaitTermination(2, TimeUnit.SECONDS); // 關閉執行緒池 forkJoinPool.shutdown(); System.out.println("*****************************程式執行結束*****************************"); } }
3、Task執行類
package com.example.jdbcConnection; import java.util.concurrent.RecursiveAction; /** * * reated by Liuxd on 2018/8/23. */ class MyTask extends RecursiveAction { // 每個"小任務"最多執行儲存20個數 private static final int MAX = 20; private int start; private int end; private TestC3p0 testC3p0; MyTask(int start, int end,TestC3p0 testC3p0) { this.start = start; this.end = end; this.testC3p0=testC3p0; } @Override protected void compute() { // 當end-start的值小於MAX時候,開始執行 if ((end - start) < MAX) { for (int i = start; i < end; i++) { String name = "樂樂"+i; int age = 17+i; String phone = "1380013800"+i; String passwd = "admin123"+i; testC3p0.add(name, age, phone, passwd); System.out.println(Thread.currentThread().getName() + "儲存"+name+" "+age+" "+" "+phone+" "+passwd); } } else { // 將大任務分解成兩個小任務 int middle = (start + end) / 2; MyTask left = new MyTask(start, middle,testC3p0); MyTask right = new MyTask(middle, end,testC3p0); // 並行執行兩個小任務 left.fork(); right.fork(); } } }