1. 程式人生 > 實用技巧 >快速匯入上億行資料檔案到資料庫表(使用 JDBC 的 executeBatch)

快速匯入上億行資料檔案到資料庫表(使用 JDBC 的 executeBatch)

最近在 cnblogs 網站上,看其他人部落格,談及一個包含很多行(一億)的大檔案,一週之內,將其資料匯入到資料庫表。

我談到可以使用“使用資料庫事務,分批 commit 到資料庫,每批次有 5000行”的方法,提高資料匯入速度,兩天應該就可以了。

好像博主及下方評論者,不太理解,這個“分批 commit”

特寫此部落格,介紹一下使用 JDBC 的 executeBatch 做分批 commit,以提高大批量資料的匯入速度。

JDBC 有個PreparedStatement 類,包含addBatch,executeBatch 等函式(或稱之為方法,我不區分這兩個概念)。配合Connection 的setAutoCommit(false),commit(),即可實現“分批 commit”

當然,首先要逐行讀資料檔案。這裡的資料檔案,一般是 .txt 或 .csv 之類的純文字檔案,以逗號作為列分割,有的以 tab 做分割字元,也有的使用固定列寬(比如1-4字元為第一列,5-12為第二列...)。

我們使用BufferedReader 來實現逐行讀取。這是一個常用的 Java 類,可很好地用於此處檔案讀取。

為了方便起見,軟體將從 Java 命令列讀取 JVM 引數,舉例如下:

-Ddata_file=C:\svn_projects\sgm_small_projects\batch_data_import\data\sample_data_1w.csv -Dfrom_line=1 -Dto_line= -Dbatch_commit_size=5000 -Duse_multi_thread=false

其中,

引數data_file 為資料檔案;

from_line 用於指定資料檔案中的起始行號,最小值為1,一方面可用於跳過標題行,另一方面,可用於長時間執行過程中,如有中斷,可重新從某行開始;

to_line 用於指定資料檔案中的結束行號,可空;

batch_commit_size 用於指定每批次的資料行數,可調整,以便測試哪種引數,匯入資料最快,此處配置為5000;

use_multi_thread 用於指定程式是否使用多執行緒,此引數暫無用處。

大批量資料檔案匯入,一般的策略為:

a. 正確的資料,儘量全部匯入;

b. 錯誤的資料,跳過、記錄報錯位置,繼續執行;

c. 全部匯入完成後,分析錯誤的資料,特殊處理。

以下介紹的程式碼,可以很好地實現這幾個策略。執行時有類似如下的日誌資訊:

15:49:49.201 [main] INFO c.z.b.srv.DatabaseSrv - saveBatchDataInTrasaction begin,iBatchNum:1
15:49:51.416 [main] INFO c.z.b.srv.DatabaseSrv - 批量 commit,批次號:1, 原資料檔案行[1-5000], 提交成功.
15:49:51.422 [main] INFO c.z.b.srv.DatabaseSrv - saveBatchDataInTrasaction begin,iBatchNum:2
15:49:52.306 [main] INFO c.z.b.srv.DatabaseSrv - 批量 commit,批次號:2, 原資料檔案行[5001-10000], 提交成功.
15:49:52.329 [main] INFO c.z.b.srv.DatabaseSrv - saveBatchDataInTrasaction begin,iBatchNum:3
15:49:53.253 [main] INFO c.z.b.srv.DatabaseSrv - 批量 commit,批次號:3, 原資料檔案行[10001-15000], 提交成功.
15:49:53.277 [main] INFO c.z.b.srv.DatabaseSrv - saveBatchDataInTrasaction begin,iBatchNum:4
15:49:54.188 [main] INFO c.z.b.srv.DatabaseSrv - 批量 commit,批次號:4, 原資料檔案行[15001-20000], 提交成功.

主控程式BatchDataImportMain ,功能為讀取以上引數,然後讀資料檔案到 reader 物件,然後呼叫 dataSrv.saveData ,程式碼如下:

public class BatchDataImportMain {

    public static void main(String[] args) {
        Logger log = LoggerFactory.getLogger(BatchDataImportMain.class);
        try {
            log.info("從命令列引數中獲取資料...");
            String strDataFile = System.getProperty("data_file");

            // 資料檔案的第一行為1,不是0,方便使用者理解
            String strFromLine = System.getProperty("from_line", "1");

            String strToLine = System.getProperty("to_line");

            MutableObject<Long> iFromLine = null;
            if (StringUtils.isNotEmpty(strFromLine)) {
                iFromLine = new MutableObject<Long>();
                iFromLine.setValue(Long.parseLong(strFromLine));
            }

            MutableObject<Long> iToLine = null;
            if (StringUtils.isNotEmpty(strToLine)) {
                iToLine = new MutableObject<Long>();
                iToLine.setValue(Long.parseLong(strToLine));
            }

            String strBatchCommitSize = System.getProperty("batch_commit_size");
            int iBatchCommitSize = 5000;
            if (StringUtils.isNotEmpty(strBatchCommitSize)) {
                iBatchCommitSize = Integer.parseInt(strBatchCommitSize);
            }

            String strUseMultiThread = System.getProperty("use_multi_thread");
            boolean bUseMultiThread = false;
            if (StringUtils.equalsIgnoreCase(strUseMultiThread, "true")) {
                bUseMultiThread = true;
            }

            File fDataFile = new File(strDataFile);

            log.info("begin save data from file:" + fDataFile.getAbsolutePath());
            DataImportSrvBase dataSrv = null;
            if (bUseMultiThread) {
                dataSrv = new DataImportSrvUseThread();
            } else {
                dataSrv = new DataImportSrvNotUseThread();
            }

            try (FileInputStream fis = new FileInputStream(fDataFile)) {
                String charsetName = "gbk";
                try (InputStreamReader isr = new InputStreamReader(fis, charsetName)) {
                    try (BufferedReader br = new BufferedReader(isr)) {
                        dataSrv.saveData(br, iFromLine, iToLine, iBatchCommitSize, fDataFile.getName());
                    }
                }

            }
            log.info("ends save data from file:" + fDataFile.getAbsolutePath());

        } catch (Exception err) {
            log.error(err.getMessage(), err);
        }
    }

}

以上dataSrv 為不採用多執行緒的 DataImportSrvNotUseThread,如果功能是對 reader 中的資料,逐行取出,每5000行為一批次,呼叫資料儲存程式碼。記憶體佔用最多為5000行資料,不會導致記憶體溢位。

分批時,記錄當前批次的資料中,在原始資料檔案中的起始行號、結束行號、當前第幾批。

DataImportSrvNotUseThread 程式碼如下:

public class DataImportSrvNotUseThread extends DataImportSrvBase {

    @Override
    public void saveData(BufferedReader br, MutableObject<Long> iFromLine, MutableObject<Long> iToLine,
            int iBatchCommitSize, String fileName) throws IOException, SQLException {
        String strLine = null;
        // DataLineParseSrv dataSrv = new DataLineParseSrv();

        LinkedList<LineString> batchLineDataBufferList = new LinkedList<LineString>();
        long iBatchNum = 0;
        long iLineNumOfFile = 0;

        while ((strLine = br.readLine()) != null) {
            iLineNumOfFile++;
            if (iFromLine != null && iFromLine.getValue() > iLineNumOfFile) {
                continue;
            }
            if (iToLine != null && iToLine.getValue() < iLineNumOfFile) {
                break;
            }

            if (StringUtils.isEmpty(strLine)) {
                continue;
            }

            // LineData data = dataSrv.parse(line);
            LineString lineData = new LineString();
            lineData.strLine = strLine;
            lineData.lineNumAtFile = iLineNumOfFile;

            batchLineDataBufferList.add(lineData);

            if (batchLineDataBufferList.size() >= iBatchCommitSize) {
                iBatchNum++;
                long iLineNumBeginOfBatch = batchLineDataBufferList.getFirst().lineNumAtFile;
                long iLineNumEndOfBatch = lineData.lineNumAtFile;
                new DatabaseSrv().saveBatchDataInTrasaction(iBatchNum, iLineNumBeginOfBatch, iLineNumEndOfBatch,
                        batchLineDataBufferList);
                batchLineDataBufferList = new LinkedList<LineString>();
            }
        }

        if (batchLineDataBufferList.size() > 0) {
            iBatchNum++;
            long iLineNumBeginOfBatch = batchLineDataBufferList.getFirst().lineNumAtFile;
            long iLineNumEndOfBatch = batchLineDataBufferList.getLast().lineNumAtFile;

            new DatabaseSrv().saveBatchDataInTrasaction(iBatchNum, iLineNumBeginOfBatch, iLineNumEndOfBatch,
                    batchLineDataBufferList);
            batchLineDataBufferList = new LinkedList<LineString>();
        }
    }

}

最後,DatabaseSrv 類的 saveBatchDataInTrasaction 函式,儲存一批資料,使用一個數據庫連線、一個 transaction. 此函式內部,使用PreparedStatement 的executeBatch。

DatabaseSrv程式碼如下:

public class DatabaseSrv {
    static BasicDataSource g_ds = null;

    public void saveBatchDataInTrasaction(long iBatchNum, long iLineNumBeginOfBatch, long iLineNumEndOfBatch,
            List<LineString> batchLineDataBufferList) throws SQLException {
        Logger log = LoggerFactory.getLogger(DatabaseSrv.class);
        log.info("saveBatchDataInTrasaction begin,iBatchNum:" + iBatchNum);
        try {
            DataLineParseSrv dataSrv = new DataLineParseSrv();
            BasicDataSource ds = getDataSource();

            try (Connection con = ds.getConnection()) {
                con.setAutoCommit(false);
                con.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);

                String sql = "insert into tt_test(col_a,col_b,col_c,col_d,col_e,col_f,col_g,col_h,col_i,col_j,col_k,col_l) values(?,?,?,?,?,?,?,?,?,?,?,?);";
                try (PreparedStatement ps = con.prepareStatement(sql)) {
                    for (LineString d : batchLineDataBufferList) {
                        LineParsedData parsedData = dataSrv.parse(d);

                        long iLineNum = d.lineNumAtFile;// 也可以在表中,先增加一列,儲存資料行號。以便檢查哪些行成功匯入了。

                        ps.setString(1, parsedData.a); // 1 is the first ? (1 based counting)
                        ps.setString(2, parsedData.b);
                        ps.setString(3, parsedData.c);
                        ps.setString(4, parsedData.d);
                        ps.setString(5, parsedData.e);
                        ps.setString(6, parsedData.f);
                        ps.setString(7, parsedData.g);
                        ps.setString(8, parsedData.h);
                        ps.setString(9, parsedData.i);
                        ps.setString(10, parsedData.j);
                        ps.setString(11, parsedData.k);
                        ps.setString(12, parsedData.l);

                        ps.addBatch();
                    }
                    ps.executeBatch();
                    con.commit();
                    // statement.clearBatch(); //If you want to add more,
                } catch (Exception err) {
                    log.error(err.getMessage(), err);
                    con.rollback();
                }
            }

            log.info("批量 commit,批次號:" + iBatchNum + ", 原資料檔案行[" + iLineNumBeginOfBatch + "-" + iLineNumEndOfBatch
                    + "], 提交成功.");
        } catch (Exception err) {
            log.error(err.getMessage(), err);
            log.info("批量 commit,批次號:" + iBatchNum + ", 原資料檔案行[" + iLineNumBeginOfBatch + "-" + iLineNumEndOfBatch
                    + "], 提交失敗.");
        }

    }

    public static BasicDataSource getDataSource() {
        if (g_ds != null) {
            return g_ds;
        } else {
            BasicDataSource ds = new BasicDataSource();
            ds.setDriverClassName("org.postgresql.Driver");
            ds.setTestOnBorrow(true);
            ds.setUrl("jdbc:postgresql://192.168.1.50:5432/zg_prt_uld");
            ds.setValidationQuery("select 1 as a;");
            ds.setUsername("zg_prt_uld_db_user");
            ds.setPassword("zg_Hello~1234!");

            ds.setInitialSize(1);
            ds.setMaxActive(30);

            g_ds = ds;
            return g_ds;
        }
    }

}

還有一些重要性較低的程式碼,此處未貼出。如需要,也可提供。

經初步測試,以上程式碼,未使用多執行緒,匯入 2萬行資料,執行三次,用時分別為 5.696秒, 4.968 秒, 5.04 秒。

按第一次執行的速度(3511行/秒),匯入 2 億行資料,順利的話,完成匯入所用時間為 15.8小時。即使加上異常資料分析、特殊處理的操作,也能很好完成該博主的工作任務(1周之內完成資料匯入)。

當然,此處程式碼,仍有效能優化的餘地。

以上效能測試,使用的是 Postgres 資料庫,本地無線區域網連線。