1. 程式人生 > >使用流讀取資料量大的檔案並存到mysql資料庫中

使用流讀取資料量大的檔案並存到mysql資料庫中

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicLong;

public class MozillaCellImporter {
    protected File file;
    protected String sqlTemplate = "INSERT user (name, age) VALUES (?, ?)";
    protected PreparedStatement statement;

    protected int batchSize = 100000;//預設一次存入資料庫數量這裡是10W條
    protected String url = "jdbc:mysql://localhost:3306/DBName?serverTimezone=UTC&useSSL=true&rewriteBatchedStatements=true";
    protected String username = "root";
    protected String password = "123456";

    protected Logger logger = LoggerFactory.getLogger(getClass());

    public MozillaCellImporter(String filepath, int batchSize){
        this.file = new File(filepath);
        this.batchSize = batchSize;
        if(!this.file.exists())
            throw new RuntimeException("file not exists: "+filepath);
    }

    public MozillaCellImporter dbSetting(String url, String username, String password){
        this.url = url;
        this.username = username;
        this.password = password;

        return this;
    }

    public String run() throws Exception {
        Connection connection = connection();
        statement = connection.prepareStatement(sqlTemplate);

        AtomicLong lineCount = new AtomicLong(0);
        AtomicLong count = new AtomicLong(0);

        try(BufferedReader reader = Files.newBufferedReader(file.toPath(), StandardCharsets.UTF_8)){
            reader.lines().forEach(line->{
                lineCount.addAndGet(1);
                if(lineCount.longValue() > 1){去掉第一條資料,根據自己的情況而定
                    try {
                        String temp[] = line.split(",");
                        statement.setInt(1, Integer.valueOf(temp[0]));
                        statement.setInt(2, Integer.valueOf(temp[1]));
                        statement.addBatch();
                        if(lineCount.longValue() % batchSize == 0){
                            count.addAndGet(statement.executeBatch().length);
                            logger.info(String.format("[%15d] insert done, batch size=%d....", lineCount.longValue(), batchSize));
                            statement.clearBatch();
                        }
                    } catch (SQLException e) {
                        logger.error(String.format("error on parse line : %s : %s", line, e.getMessage()));
                    }
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
        count.addAndGet(statement.executeBatch().length);
        statement.close();
        connection.close();

        return String.format("%d,%d", lineCount.longValue(), count.longValue());
    }

    private Connection connection() throws ClassNotFoundException, SQLException {
        Class.forName("com.mysql.jdbc.Driver");
        return DriverManager.getConnection(url, username, password);
    }
}

s是檔案的路徑

100000是一次存入資料庫的數量

返回result格式是  檔案資料量,成功存入資料庫的數量

String result = null;
try {
     result = new MozillaCellImporter(s, 100000).dbSetting(jdbcUrl, jdbcUsername, jdbcPassword).run();
} catch (Exception e) {
     e.printStackTrace();
}
String[] strings = result.split(",");