多執行緒、執行緒池、StopWatch、CountDownLatch實戰
阿新 • • 發佈:2021-01-16
目錄
業務場景
- 使用Java程式碼儘可能快地修改全部使用者的密碼為“123456”。
1.500萬大資料批處理;
2.多執行緒的使用;
3.執行緒池;
4.StopWatch 計時;
5.CountDownLatch 計數器。
(此處只是舉例多執行緒、執行緒池的使用,這個場景完全可以直接 UPDATE。)
程式碼結構
UserController.java
package com.acgkaka.test.controller;
import com.acgkaka.test.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.util.StopWatch;
import org.springframework.web.bind. annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.format.DateTimeParseException;
/**
* 使用者資料更新入口類
*
* @author ACGkaka
*/
@Controller
@RequestMapping("user")
public class UserController {
@Autowired
UserService service;
/**
* 更新使用者資料
*
* @param startYmd 開始日期
* @param endYmd 結束日期
* @return 是否更新成功
*/
@RequestMapping("update")
@ResponseBody
public String update(@RequestParam(value="startYmd")String startYmd, @RequestParam(value="endYmd")String endYmd) {
try {
StopWatch sw = new StopWatch();
sw.start();
// 業務邏輯處理
service.update(startYmd, endYmd);
sw.stop();
String minutes = new BigDecimal(String.valueOf(sw.getTotalTimeSeconds()/60)).setScale(0, RoundingMode.DOWN).toString();
String seconds = new BigDecimal(String.valueOf(sw.getTotalTimeSeconds()%60)).setScale(0, RoundingMode.HALF_UP).toString();
return String.format("<h1>更新成功</h1><h1>耗時: %sm%ss", minutes, seconds);
} catch(DateTimeParseException e) {
e.printStackTrace();
return "<h1>更新失敗,日期引數格式異常,示例:20200101</h1>";
} catch(Exception e) {
e.printStackTrace();
return "<h1>更新失敗," + e.getMessage() + "</h1>";
}
}
}
User.java
package com.acgkaka.test.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* <p> @Title User
* <p> @Description 使用者
*
* @author ACGkaka
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User {
/** 使用者名稱 */
private String username;
/** 密碼 */
private String password;
/** 有效性; 1-有效,0-無效 */
private int valid;
}
UserThreadHandle.java
package com.acgkaka.test.handle;
import com.acgkaka.test.entity.User;
import com.acgkaka.test.mapper.UserMapper;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* 多執行緒處理資料
*
* @author ACGkaka
*/
@Slf4j
public class UserThreadHandle implements Runnable {
/** 資料集合 */
private List<User> list;
/** 計數器 */
private CountDownLatch count;
/** mapper */
private UserMapper userMapper;
/**
* 構造方法 - 初始化多執行緒執行時需要的例項
*
* @param list 資料集合
* @param userMapper mapper
* @param count 計數器
*/
public UserThreadHandle(List<User> list,
UserMapper userMapper,
CountDownLatch count) {
this.list = list;
this.userMapper = userMapper;
this.count = count;
}
@Override
public void run() {
log.info(">>>>>>>>>> 正在執行: Thread ID: {}, Thread Name: {}", Thread.currentThread().getId(), Thread.currentThread().getName());
try {
if (list != null && !list.isEmpty()) {
// 修改密碼為12345,此處只是舉例,這個場景完全可以直接用SQL UPDATE。
list.forEach(item -> item.setPassword("123456"));
// 注意批量儲存只有MySQL支援,Oracle不支援
userMapper.saveOrUpdate(list);
}
} catch (Exception e) {
e.printStackTrace();
}finally {
// 計數器 - 1(重要)
count.countDown();
}
}
}
UserMapper.java
package com.acgkaka.test.mapper;
import com.acgkaka.test.entity.User;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* 使用者表資料庫訪問層
*
* @author ACGkaka
*/
@Mapper
public interface UserMapper {
/**
* 根據時間查詢條數
*
* @param ymd 日期
* @return 條數
*/
int queryCount(@Param("ymd") String ymd);
/**
* 根據日期,分頁查詢使用者資料
*
* @param ymd 日期
* @param rowNumStart 開始行數
* @param rowNumEnd 結束行數
* @return 物件列表
*/
List<User> queryAll(@Param("ymd") String ymd, @Param("rowNumStart") int rowNumStart, @Param("rowNumEnd") int rowNumEnd);
/**
* 批量更新
* (前提是必須設定主鍵,會自動根據主鍵進行 “無則插入,有則更新”)
*
* @param list 待更新的資料
*/
void saveOrUpdate(@Param("list") List<User> list);
}
UserMapper.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.acgkaka.test.mapper.UserMapper">
<resultMap type="com.acgkaka.test.entity.User" id="dataResult">
<result column="username" property="username"/>
<result column="password" property="password"/>
<result column="valid" property="valid"/>
</resultMap>
<sql id="columns">USERNAME, PASSWORD, VALID</sql>
<select id="queryCount" resultType="java.lang.Integer">
SELECT count(1)
FROM T_SYS_USER U
<if test="ymd != null">
WHERE U.CREATED_TIME = to_date(#{ymd}||' 00:00:00','yyyymmdd hh24:mi:ss')
</if>
</select>
<!--通過實體作為篩選條件查詢-->
<select id="queryAll" resultMap="dataResult">
SELECT T.*
FROM
(SELECT ROWNUM ROWNO, <include refid="columns"/> FROM T_SYS_USER
<if test="ymd != null">
WHERE CREATED_TIME = to_date(#{ymd}||' 00:00:00','yyyymmdd hh24:mi:ss')
</if>
) T
WHERE T.ROWNO >=#{rowNumStart} AND T.ROWNO < #{rowNumEnd}
</select>
<insert id="saveOrUpdate" parameterType="java.util.List">
INSERT INTO T_SYS_USER (<include refid="columns"/>) VALUES
<foreach collection="list" item="item" index="index" separator=",">
(#{item.username}, #{item.password}, #{item.valid})
</foreach>
ON DUPLICATE KEY UPDATE
PASSWORD=VALUES(PASSWORD),
VALID=VALUES(VALID)
</insert>
</mapper>
UserService.java
package com.acgkaka.test.service;
/**
* 使用者表服務介面
*
* @author ACGkaka
*/
public interface UserService {
/**
* 使用者資料採集
*
* @param startYmd 開始日期
* @param endYmd 結束日期
* @throws InterruptedException 執行緒異常
*/
void update(String startYmd, String endYmd) throws InterruptedException;
}
UserServiceImpl.java
package com.acgkaka.test.service.impl;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.acgkaka.test.entity.User;
import com.acgkaka.test.handle.UserThreadHandle;
import com.acgkaka.test.mapper.UserMapper;
import com.acgkaka.test.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.*;
/**
* (T_SYS_User)表服務實現類
*
* @author ACGkaka
*/
@Slf4j
@Service
public class UserServiceImpl implements UserService {
@Autowired
private UserMapper userMapper;
/** 執行緒池大小, 最佳執行緒數: 作業系統核心數 + 2 */
private static final int POOL_SIZE = Runtime.getRuntime().availableProcessors() + 2;
/** 資料長度, 經過測試 3000 的資料長度是最快的 */
private static final int DATA_LENGTH = 3000;
/** 重新命名執行緒 */
private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("User-pool-%d").build();
/**
* 初始化執行緒池
*/
private ExecutorService pool = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024),
namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
@Override
public void update(String startYmd,String endYmd) throws InterruptedException {
log.info("查詢引數,開始時間:{} 結束時間:{} ",startYmd,endYmd);
LocalDate date = LocalDate.parse(startYmd, DateTimeFormatter.BASIC_ISO_DATE);
LocalDate endDate = LocalDate.parse(endYmd, DateTimeFormatter.BASIC_ISO_DATE);
while (date.compareTo(endDate) <= 0) {
int rownum = 0;
String ymd = DateTimeFormatter.BASIC_ISO_DATE.format(date);
// 查詢數量,方便分頁
int size = userMapper.queryCount(ymd);
List<User> list = userMapper.queryAll(ymd,rownum, DATA_LENGTH);
// 迴圈分頁查詢處理
while (CollectionUtils.isNotEmpty(list)) {
log.info("開始新的迴圈....");
int countNum = (size - rownum) / DATA_LENGTH + 1 < POOL_SIZE ? (size - rownum) / DATA_LENGTH + 1 : POOL_SIZE;
// 計數器,呼叫await方法分階段等待執行緒執行
CountDownLatch count = new CountDownLatch(countNum);
for (int i = 0; i < POOL_SIZE && CollectionUtils.isNotEmpty(list); i++) {
rownum+=DATA_LENGTH;
// 多執行緒處理
pool.execute(new UserThreadHandle(list, userMapper, count));
list = userMapper.queryAll(ymd,rownum, rownum + DATA_LENGTH);
}
log.info("正在等待...");
count.await();
}
date = date.plusDays(1);
}
}
}
執行結果
文章為本人開發總結分享,如果有更快的方式,歡迎大家分享~
分享文章:
這裡分享一篇大佬寫得比較詳細的《CountDownLatch用法詳解》https://blog.csdn.net/qq812908087/article/details/81112188