1. 程式人生 > 其它 >多執行緒、執行緒池、StopWatch、CountDownLatch實戰

多執行緒、執行緒池、StopWatch、CountDownLatch實戰

技術標籤:Java高併發java

目錄

業務場景

  • 使用Java程式碼儘可能快地修改全部使用者的密碼為“123456”。

1.500萬大資料批處理;

2.多執行緒的使用;

3.執行緒池;

4.StopWatch 計時;

5.CountDownLatch 計數器。

(此處只是舉例多執行緒、執行緒池的使用,這個場景完全可以直接 UPDATE。)

程式碼結構

在這裡插入圖片描述

UserController.java

package com.acgkaka.test.controller;

import com.acgkaka.test.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.util.StopWatch;
import org.springframework.web.bind.
annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import java.math.BigDecimal; import java.math.RoundingMode; import java.time.format.DateTimeParseException; /** * 使用者資料更新入口類 * * @author ACGkaka */
@Controller @RequestMapping("user") public class UserController { @Autowired UserService service; /** * 更新使用者資料 * * @param startYmd 開始日期 * @param endYmd 結束日期 * @return 是否更新成功 */ @RequestMapping("update") @ResponseBody public String update(@RequestParam(value="startYmd")String startYmd, @RequestParam(value="endYmd")String endYmd) { try { StopWatch sw = new StopWatch(); sw.start(); // 業務邏輯處理 service.update(startYmd, endYmd); sw.stop(); String minutes = new BigDecimal(String.valueOf(sw.getTotalTimeSeconds()/60)).setScale(0, RoundingMode.DOWN).toString(); String seconds = new BigDecimal(String.valueOf(sw.getTotalTimeSeconds()%60)).setScale(0, RoundingMode.HALF_UP).toString(); return String.format("<h1>更新成功</h1><h1>耗時: %sm%ss", minutes, seconds); } catch(DateTimeParseException e) { e.printStackTrace(); return "<h1>更新失敗,日期引數格式異常,示例:20200101</h1>"; } catch(Exception e) { e.printStackTrace(); return "<h1>更新失敗," + e.getMessage() + "</h1>"; } } }

User.java

package com.acgkaka.test.entity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * <p> @Title User
 * <p> @Description 使用者
 *
 * @author ACGkaka
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User {

    /** 使用者名稱 */
    private String username;

    /** 密碼 */
    private String password;

    /** 有效性; 1-有效,0-無效 */
    private int valid;

}

UserThreadHandle.java

package com.acgkaka.test.handle;

import com.acgkaka.test.entity.User;
import com.acgkaka.test.mapper.UserMapper;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * 多執行緒處理資料
 *
 * @author ACGkaka
 */
@Slf4j
public class UserThreadHandle implements Runnable {

    /** 資料集合 */
    private List<User> list;

    /** 計數器 */
    private CountDownLatch count;

    /** mapper */
    private UserMapper userMapper;

    /**
     *  構造方法 - 初始化多執行緒執行時需要的例項
     *
     * @param list 資料集合
     * @param userMapper mapper
     * @param count 計數器
     */
    public UserThreadHandle(List<User> list,
                            UserMapper userMapper,
                            CountDownLatch count) {
        this.list = list;
        this.userMapper = userMapper;
        this.count = count;
    }

    @Override
    public void run() {
        log.info(">>>>>>>>>> 正在執行: Thread ID: {}, Thread Name: {}", Thread.currentThread().getId(), Thread.currentThread().getName());
        try {
            if (list != null && !list.isEmpty()) {
                // 修改密碼為12345,此處只是舉例,這個場景完全可以直接用SQL UPDATE。
                list.forEach(item -> item.setPassword("123456"));
                // 注意批量儲存只有MySQL支援,Oracle不支援
                userMapper.saveOrUpdate(list);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            // 計數器 - 1(重要)
            count.countDown();
        }
    }
}

UserMapper.java

package com.acgkaka.test.mapper;

import com.acgkaka.test.entity.User;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

import java.util.List;

/**
 * 使用者表資料庫訪問層
 *
 * @author ACGkaka
 */
@Mapper
public interface UserMapper {

    /**
     * 根據時間查詢條數
     *
     * @param ymd 日期
     * @return 條數
     */
    int queryCount(@Param("ymd") String ymd);

    /**
     * 根據日期,分頁查詢使用者資料
     *
     * @param ymd 日期
     * @param rowNumStart 開始行數
     * @param rowNumEnd 結束行數
     * @return 物件列表
     */
    List<User> queryAll(@Param("ymd") String ymd, @Param("rowNumStart") int rowNumStart, @Param("rowNumEnd") int rowNumEnd);

    /**
     * 批量更新
     * (前提是必須設定主鍵,會自動根據主鍵進行 “無則插入,有則更新”)
     *
     * @param list 待更新的資料
     */
    void saveOrUpdate(@Param("list") List<User> list);

}

UserMapper.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.acgkaka.test.mapper.UserMapper">


    <resultMap type="com.acgkaka.test.entity.User" id="dataResult">
        <result column="username" property="username"/>
        <result column="password" property="password"/>
        <result column="valid" property="valid"/>
    </resultMap>

    <sql id="columns">USERNAME, PASSWORD, VALID</sql>

    <select id="queryCount" resultType="java.lang.Integer">
        SELECT count(1)
        FROM T_SYS_USER U
        <if test="ymd != null">
            WHERE U.CREATED_TIME = to_date(#{ymd}||' 00:00:00','yyyymmdd hh24:mi:ss')
        </if>
    </select>

    <!--通過實體作為篩選條件查詢-->
    <select id="queryAll" resultMap="dataResult">
        SELECT T.*
        FROM
            (SELECT ROWNUM ROWNO, <include refid="columns"/> FROM T_SYS_USER
            <if test="ymd != null">
                WHERE CREATED_TIME = to_date(#{ymd}||' 00:00:00','yyyymmdd hh24:mi:ss')
            </if>
            ) T
        WHERE T.ROWNO &gt;=#{rowNumStart} AND T.ROWNO &lt; #{rowNumEnd}
    </select>

    <insert id="saveOrUpdate" parameterType="java.util.List">
        INSERT INTO T_SYS_USER (<include refid="columns"/>) VALUES
        <foreach collection="list" item="item" index="index" separator=",">
            (#{item.username}, #{item.password}, #{item.valid})
        </foreach>
        ON DUPLICATE KEY UPDATE
        PASSWORD=VALUES(PASSWORD),
        VALID=VALUES(VALID)
    </insert>

</mapper>

UserService.java

package com.acgkaka.test.service;

/**
 * 使用者表服務介面
 *
 * @author ACGkaka
 */
public interface UserService {

    /**
     * 使用者資料採集
     *
     * @param startYmd 開始日期
     * @param endYmd 結束日期
     * @throws InterruptedException 執行緒異常
     */
    void update(String startYmd, String endYmd) throws InterruptedException;

}

UserServiceImpl.java

package com.acgkaka.test.service.impl;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.acgkaka.test.entity.User;
import com.acgkaka.test.handle.UserThreadHandle;
import com.acgkaka.test.mapper.UserMapper;
import com.acgkaka.test.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.*;

/**
 * (T_SYS_User)表服務實現類
 *
 * @author ACGkaka
 */
@Slf4j
@Service
public class UserServiceImpl implements UserService {

    @Autowired
    private UserMapper userMapper;

    /** 執行緒池大小, 最佳執行緒數: 作業系統核心數 + 2 */
    private static final int POOL_SIZE = Runtime.getRuntime().availableProcessors() + 2;

    /** 資料長度, 經過測試 3000 的資料長度是最快的 */
    private static final int DATA_LENGTH = 3000;

    /** 重新命名執行緒 */
    private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("User-pool-%d").build();

    /**
     * 初始化執行緒池
     */
    private ExecutorService pool = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE,
            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024),
            namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

    @Override
    public void update(String startYmd,String endYmd) throws InterruptedException {

        log.info("查詢引數,開始時間:{} 結束時間:{} ",startYmd,endYmd);

        LocalDate date = LocalDate.parse(startYmd, DateTimeFormatter.BASIC_ISO_DATE);
        LocalDate endDate = LocalDate.parse(endYmd, DateTimeFormatter.BASIC_ISO_DATE);

        while (date.compareTo(endDate) <= 0) {
            int rownum = 0;
            String ymd = DateTimeFormatter.BASIC_ISO_DATE.format(date);

            // 查詢數量,方便分頁
            int size = userMapper.queryCount(ymd);
            List<User> list = userMapper.queryAll(ymd,rownum, DATA_LENGTH);

            // 迴圈分頁查詢處理
            while (CollectionUtils.isNotEmpty(list)) {
                log.info("開始新的迴圈....");
                int countNum = (size - rownum) / DATA_LENGTH + 1 < POOL_SIZE ? (size - rownum) / DATA_LENGTH + 1 : POOL_SIZE;

                // 計數器,呼叫await方法分階段等待執行緒執行
                CountDownLatch count = new CountDownLatch(countNum);
                for (int i = 0; i < POOL_SIZE && CollectionUtils.isNotEmpty(list); i++) {
                    rownum+=DATA_LENGTH;

                    // 多執行緒處理
                    pool.execute(new UserThreadHandle(list, userMapper, count));
                    list = userMapper.queryAll(ymd,rownum, rownum + DATA_LENGTH);
                }
                log.info("正在等待...");
                count.await();
            }
            date = date.plusDays(1);
        }
    }
}

執行結果

在這裡插入圖片描述

文章為本人開發總結分享,如果有更快的方式,歡迎大家分享~





分享文章:

​ 這裡分享一篇大佬寫得比較詳細的《CountDownLatch用法詳解》https://blog.csdn.net/qq812908087/article/details/81112188