多執行緒批量拆分List匯入資料庫
一、前言
前兩天做了一個匯入的功能,匯入開始的時候非常慢,匯入2w條資料要1分多鐘,後來一點一點的優化,從直接把list懟進Mysql中,到分配把list匯入Mysql中,到多執行緒把list匯入Mysql中。時間是一點一點的變少了。非常的爽,最後變成了10s以內。下面就展示一下過程。
二、直接把list懟進Mysql
使用mybatis的批量匯入操作:
@Transactional(rollbackFor = Exception.class) public int addFreshStudentsNew2(List<FreshStudentAndStudentModel> list, String schoolNo) { if (list == null || list.isEmpty()) { return 0; } List<StudentEntity> studentEntityList = new LinkedList<>(); List<EnrollStudentEntity> enrollStudentEntityList = new LinkedList<>(); List<AllusersEntity> allusersEntityList = new LinkedList<>(); for (FreshStudentAndStudentModel freshStudentAndStudentModel : list) { EnrollStudentEntity enrollStudentEntity = new EnrollStudentEntity(); StudentEntity studentEntity = new StudentEntity(); BeanUtils.copyProperties(freshStudentAndStudentModel, studentEntity); BeanUtils.copyProperties(freshStudentAndStudentModel, enrollStudentEntity); String operator = TenancyContext.UserID.get(); String studentId = BaseUuidUtils.base58Uuid(); enrollStudentEntity.setId(BaseUuidUtils.base58Uuid()); enrollStudentEntity.setStudentId(studentId); enrollStudentEntity.setIdentityCardId(freshStudentAndStudentModel.getIdCard()); enrollStudentEntity.setOperator(operator); studentEntity.setId(studentId); studentEntity.setIdentityCardId(freshStudentAndStudentModel.getIdCard()); studentEntity.setOperator(operator); studentEntityList.add(studentEntity); enrollStudentEntityList.add(enrollStudentEntity); AllusersEntity allusersEntity = new AllusersEntity(); allusersEntity.setId(enrollStudentEntity.getId()); allusersEntity.setUserCode(enrollStudentEntity.getNemtCode()); allusersEntity.setUserName(enrollStudentEntity.getName()); allusersEntity.setSchoolNo(schoolNo); allusersEntity.setTelNum(enrollStudentEntity.getTelNum()); allusersEntity.setPassword(enrollStudentEntity.getNemtCode()); //密碼設定為考生號 allusersEntityList.add(allusersEntity); } enResult = enrollStudentDao.insertAll(enrollStudentEntityList); stuResult = studentDao.insertAll(studentEntityList); allResult = allusersFacade.insertUserList(allusersEntityList); if (enResult > 0 && stuResult > 0 && allResult) { return 10; } return -10; }
Mapper.xml
<insert id="insertAll" parameterType="com.dmsdbj.itoo.basicInfo.entity.EnrollStudentEntity"> insert into tb_enroll_student <trim prefix="(" suffix=")" suffixOverrides=","> id, remark, nEMT_aspiration, nEMT_code, nEMT_score, student_id, identity_card_id, level, major, name, nation, secondary_college, operator, sex, is_delete, account_address, native_place, original_place, used_name, pictrue, join_party_date, political_status, tel_num, is_registry, graduate_school, create_time, update_time </trim> values <foreach collection="list" item="item" index="index" separator=","> ( #{item.id,jdbcType=VARCHAR}, #{item.remark,jdbcType=VARCHAR}, #{item.nemtAspiration,jdbcType=VARCHAR}, #{item.nemtCode,jdbcType=VARCHAR}, #{item.nemtScore,jdbcType=VARCHAR}, #{item.studentId,jdbcType=VARCHAR}, #{item.identityCardId,jdbcType=VARCHAR}, #{item.level,jdbcType=VARCHAR}, #{item.major,jdbcType=VARCHAR}, #{item.name,jdbcType=VARCHAR}, #{item.nation,jdbcType=VARCHAR}, #{item.secondaryCollege,jdbcType=VARCHAR}, #{item.operator,jdbcType=VARCHAR}, #{item.sex,jdbcType=VARCHAR}, 0, #{item.accountAddress,jdbcType=VARCHAR}, #{item.nativePlace,jdbcType=VARCHAR}, #{item.originalPlace,jdbcType=VARCHAR}, #{item.usedName,jdbcType=VARCHAR}, #{item.pictrue,jdbcType=VARCHAR}, #{item.joinPartyDate,jdbcType=VARCHAR}, #{item.politicalStatus,jdbcType=VARCHAR}, #{item.telNum,jdbcType=VARCHAR}, #{item.isRegistry,jdbcType=TINYINT}, #{item.graduateSchool,jdbcType=VARCHAR}, now(), now() ) </foreach> </insert>
程式碼說明:
底層的mapper是通過逆向工程來生成的,批量插入如下,是拼接成類似:
insert into tb_enroll_student()values (),()…….() ;
這樣的缺點是,資料庫一般有一個預設的設定,就是每次sql操作的資料不能超過4M。這樣插入,資料多的時候,資料庫會報錯
雖然我們可以通過
Packet for query is too large (6071393 > 4194304). You can change this value on the server by setting the max_allowed_packet' variable.,
類似 修改 my.ini 加上
max_allowed_packet =67108864
67108864=64M
預設大小4194304 也就是4M
修改完成之後要重啟mysql服務,如果通過命令列修改就不用重啟mysql服務。
完成本次操作,但是我們不能保證專案單次最大的大小是多少,這樣是有弊端的。所以可以考慮進行分組匯入。
二、分組把list匯入Mysql中
同樣適用mybatis批量插入,區別是對每次的匯入進行分組計算,然後分多次進行匯入:
@Transactional(rollbackFor = Exception.class)
public int addFreshStudentsNew2(List<FreshStudentAndStudentModel> list, String schoolNo) {
if (list == null || list.isEmpty()) {
return 0;
}
List<StudentEntity> studentEntityList = new LinkedList<>();
List<EnrollStudentEntity> enrollStudentEntityList = new LinkedList<>();
List<AllusersEntity> allusersEntityList = new LinkedList<>();
for (FreshStudentAndStudentModel freshStudentAndStudentModel : list) {
EnrollStudentEntity enrollStudentEntity = new EnrollStudentEntity();
StudentEntity studentEntity = new StudentEntity();
BeanUtils.copyProperties(freshStudentAndStudentModel, studentEntity);
BeanUtils.copyProperties(freshStudentAndStudentModel, enrollStudentEntity);
String operator = TenancyContext.UserID.get();
String studentId = BaseUuidUtils.base58Uuid();
enrollStudentEntity.setId(BaseUuidUtils.base58Uuid());
enrollStudentEntity.setStudentId(studentId);
enrollStudentEntity.setIdentityCardId(freshStudentAndStudentModel.getIdCard());
enrollStudentEntity.setOperator(operator);
studentEntity.setId(studentId);
studentEntity.setIdentityCardId(freshStudentAndStudentModel.getIdCard());
studentEntity.setOperator(operator);
studentEntityList.add(studentEntity);
enrollStudentEntityList.add(enrollStudentEntity);
AllusersEntity allusersEntity = new AllusersEntity();
allusersEntity.setId(enrollStudentEntity.getId());
allusersEntity.setUserCode(enrollStudentEntity.getNemtCode());
allusersEntity.setUserName(enrollStudentEntity.getName());
allusersEntity.setSchoolNo(schoolNo);
allusersEntity.setTelNum(enrollStudentEntity.getTelNum());
allusersEntity.setPassword(enrollStudentEntity.getNemtCode()); //密碼設定為考生號
allusersEntityList.add(allusersEntity);
}
int c = 100;
int b = enrollStudentEntityList.size() / c;
int d = enrollStudentEntityList.size() % c;
int enResult = 0;
int stuResult = 0;
boolean allResult = false;
for (int e = c; e <= c * b; e = e + c) {
enResult = enrollStudentDao.insertAll(enrollStudentEntityList.subList(e - c, e));
stuResult = studentDao.insertAll(studentEntityList.subList(e - c, e));
allResult = allusersFacade.insertUserList(allusersEntityList.subList(e - c, e));
}
if (d != 0) {
enResult = enrollStudentDao.insertAll(enrollStudentEntityList.subList(c * b, enrollStudentEntityList.size()));
stuResult = studentDao.insertAll(studentEntityList.subList(c * b, studentEntityList.size()));
allResult = allusersFacade.insertUserList(allusersEntityList.subList(c * b, allusersEntityList.size()));
}
if (enResult > 0 && stuResult > 0 && allResult) {
return 10;
}
return -10;
}
程式碼說明:
這樣操作,可以避免上面的錯誤,但是分多次插入,無形中就增加了操作實踐,很容易超時。所以這種方法還是不值得提倡的。
再次改進,使用多執行緒分批匯入。
四、多執行緒分批匯入Mysql
依然使用mybatis的批量匯入,不同的是,根據執行緒數目進行分組,然後再建立多執行緒池,進行匯入。
@Transactional(rollbackFor = Exception.class)
public int addFreshStudentsNew(List<FreshStudentAndStudentModel> list, String schoolNo) {
if (list == null || list.isEmpty()) {
return 0;
}
List<StudentEntity> studentEntityList = new LinkedList<>();
List<EnrollStudentEntity> enrollStudentEntityList = new LinkedList<>();
List<AllusersEntity> allusersEntityList = new LinkedList<>();
list.forEach(freshStudentAndStudentModel -> {
EnrollStudentEntity enrollStudentEntity = new EnrollStudentEntity();
StudentEntity studentEntity = new StudentEntity();
BeanUtils.copyProperties(freshStudentAndStudentModel, studentEntity);
BeanUtils.copyProperties(freshStudentAndStudentModel, enrollStudentEntity);
String operator = TenancyContext.UserID.get();
String studentId = BaseUuidUtils.base58Uuid();
enrollStudentEntity.setId(BaseUuidUtils.base58Uuid());
enrollStudentEntity.setStudentId(studentId);
enrollStudentEntity.setIdentityCardId(freshStudentAndStudentModel.getIdCard());
enrollStudentEntity.setOperator(operator);
studentEntity.setId(studentId);
studentEntity.setIdentityCardId(freshStudentAndStudentModel.getIdCard());
studentEntity.setOperator(operator);
studentEntityList.add(studentEntity);
enrollStudentEntityList.add(enrollStudentEntity);
AllusersEntity allusersEntity = new AllusersEntity();
allusersEntity.setId(enrollStudentEntity.getId());
allusersEntity.setUserCode(enrollStudentEntity.getNemtCode());
allusersEntity.setUserName(enrollStudentEntity.getName());
allusersEntity.setSchoolNo(schoolNo);
allusersEntity.setTelNum(enrollStudentEntity.getTelNum());
allusersEntity.setPassword(enrollStudentEntity.getNemtCode()); //密碼設定為考生號
allusersEntityList.add(allusersEntity);
});
int nThreads = 50;
int size = enrollStudentEntityList.size();
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
List<Future<Integer>> futures = new ArrayList<Future<Integer>>(nThreads);
for (int i = 0; i < nThreads; i++) {
final List<EnrollStudentEntity> EnrollStudentEntityImputList = enrollStudentEntityList.subList(size / nThreads * i, size / nThreads * (i + 1));
final List<StudentEntity> studentEntityImportList = studentEntityList.subList(size / nThreads * i, size / nThreads * (i + 1));
final List<AllusersEntity> allusersEntityImportList = allusersEntityList.subList(size / nThreads * i, size / nThreads * (i + 1));
Callable<Integer> task1 = () -> {
studentSave.saveStudent(EnrollStudentEntityImputList,studentEntityImportList,allusersEntityImportList);
return 1;
};
futures.add(executorService.submit(task1));
}
executorService.shutdown();
if (!futures.isEmpty() && futures != null) {
return 10;
}
return -10;
}
程式碼說明:
上面是通過應用ExecutorService 建立了固定的執行緒數,然後根據執行緒數目進行分組,批量依次匯入。一方面可以緩解資料庫的壓力,另一個面執行緒數目多了,一定程度會提高程式執行的時間。缺點就是要看伺服器的配置,如果配置好的話就可以開多點執行緒,配置差的話就開小點。