1. 程式人生 > >ThreadPoolExecutor帶Queue緩沖隊列的線程池 + JMeter模擬並發下單請求

ThreadPoolExecutor帶Queue緩沖隊列的線程池 + JMeter模擬並發下單請求

acc discard blob 連接池 大數 private can imu led

.原文:https://blog.csdn.net/u011677147/article/details/80271174

拓展:

https://github.com/jwpttcg66/GameThreadPool/blob/85bb392151324e68addec355d85d9ce22b4ab1e2/src/test/java/com/snowcattle/game/thread/ThreadPoolTest.java
遊戲中常用的線程池,順序隊列和非順序隊列

@RestController
public class TestController {

    @Autowired
    TestThreadPoolManager testThreadPoolManager;

    
/** * 測試模擬下單請求 入口 * @param id * @return */ @GetMapping("/start/{idhaha}") public String start(@PathVariable Long idhaha) { //模擬的隨機數 String orderNo = System.currentTimeMillis() + UUID.randomUUID().toString(); testThreadPoolManager.addOrders(orderNo);
return "Test ThreadPoolExecutor start"; } /** * 停止服務 * @param id * @return */ @GetMapping("/end/{id}") public String end(@PathVariable Long id) { testThreadPoolManager.shutdown(); Queue q = testThreadPoolManager.getMsgQueue(); System.out.println(
"關閉了線程服務,還有未處理的信息條數:" + q.size()); return "Test ThreadPoolExecutor start"; } }

@Component
public class TestThreadPoolManager implements BeanFactoryAware {

    //用於從IOC裏取對象
    private BeanFactory factory; //如果實現Runnable的類是通過spring的application.xml文件進行註入,可通過 factory.getBean()獲取,這裏只是提一下


    private final static int CORE_POOL_SIZE = 2;

    private final static int MAX_POOL_SIZE = 10;

    private final static int KEEP_ALIVE_TIME = 0;
    // 線程池所使用的緩沖隊列大小
    private final static int WORK_QUEUE_SIZE = 50;

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        factory = beanFactory;
    }

    /**
     * 線程隊列:用於儲存在隊列中的訂單,防止重復提交,在真實場景中,可用redis代替 驗證重復
     */
    Map<String, Object> cacheMap = new ConcurrentHashMap<>();


    /**
     * 線程緩沖隊列:訂單的緩沖隊列,當線程池滿了,則將訂單存入到此緩沖隊列
     */
    Queue<Object> msgQueue = new LinkedBlockingQueue<Object>();


    /**
     * 在使用線程池並且使用有界隊列的時候,如果隊列滿了,任務添加到線程池的時候就會有問題,針對這些問題java線程池提供了以下幾種策略:
     * AbortPolicy
     * DiscardPolicy
     * DiscardOldestPolicy
     * CallerRunsPolicy
     * 自定義
     *
     * 當前采用的就是自定義:線程池的容量滿了,執行下面代碼,將訂單存入到緩沖隊列
     */
    final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            //訂單加入到緩沖隊列
            msgQueue.offer(((BusinessThread) r).getAcceptStr());
            System.out.println("系統任務太忙了(有界隊列已經滿了),把此訂單交給(調度線程池)逐一處理,訂單號:" + ((BusinessThread) r).getAcceptStr());
        }
    };


    /**創建線程池
     *
     * 概念解釋及原理(無實例):
     * https://uule.iteye.com/blog/1123185
     * https://www.cnblogs.com/trust-freedom/p/6594270.html
     * https://www.cnblogs.com/zedosu/p/6665306.html
     *
     *含實例的:
     * https://blog.csdn.net/x631617479/article/details/83001198 :自定義連接池ThreadPoolExecutor執行順序
     * https://blog.csdn.net/changyuan101/article/details/50755157 :ThreadPoolExecutor自定義RejectedExecutionHandler當隊列滿時改為調用BlockingQueue. put來實現生產者的阻塞
     *
     * 處理任務的優先級為:
     * corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用(調度線程池)handler處理被拒絕的任務。
     *
     *
     * */
    final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            CORE_POOL_SIZE,                             // 線程池維護線程的最少數量
            MAX_POOL_SIZE,                              // 線程池維護線程的最大數量
            KEEP_ALIVE_TIME,                            // 線程池維護線程所允許的空閑時間
            TimeUnit.SECONDS,                           // 線程池維護線程所允許的空閑時間的單位
            new ArrayBlockingQueue(WORK_QUEUE_SIZE),    // 線程池所使用的緩沖隊列
            this.handler                                // 線程池對拒絕任務的處理策略
    );


    /**將任務加入訂單線程池*/
    public void addOrders(String orderId){
        System.out.println("此訂單準備添加到線程池,訂單號:" + orderId);
        //驗證當前進入的訂單是否已經存在
        if (cacheMap.get(orderId) == null) {
            cacheMap.put(orderId, new Object());
            BusinessThread businessThread = new BusinessThread(orderId);
            threadPool.execute(businessThread);
        }
    }

    /**
     * 線程池的定時任務----> 稱為(調度線程池)。此線程池支持 定時以及周期性執行任務的需求。
     */
    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);


    /**
     * 檢查(調度線程池),每秒執行一次,查看訂單的緩沖隊列是否有 訂單記錄,則重新加入到線程池
     */
    final ScheduledFuture scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            //判斷緩沖隊列是否存在記錄
            if(!msgQueue.isEmpty()){
                //當線程池的隊列容量少於 WORK_QUEUE_SIZE(緩沖隊列max),則開始把緩沖隊列的訂單 加入到 線程池
                if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {
                    String orderId = (String) msgQueue.poll();
                    BusinessThread businessThread = new BusinessThread(orderId);
                    threadPool.execute(businessThread);
                    System.out.println("(調度線程池)緩沖隊列出現訂單業務,重新添加到線程池,訂單號:"+orderId);
                }
            }
        }
    }, 0, 1, TimeUnit.SECONDS);


    /**獲取消息緩沖隊列*/
    public Queue<Object> getMsgQueue() {
        return msgQueue;
    }

    /**終止訂單線程池+調度線程池*/
    public void shutdown() {
        //true表示如果定時任務在執行,立即中止,false則等待任務結束後再停止
        System.out.println("終止訂單線程池+調度線程池:"+scheduledFuture.cancel(false));
        scheduler.shutdown();
        threadPool.shutdown();

    }
}

@Component
@Scope("prototype")//spring 多例
public class BusinessThread implements Runnable{

    private String acceptStr;

    public BusinessThread() {
        super();
    }

    public BusinessThread(String acceptStr) {
        this.acceptStr = acceptStr;
    }

    public String getAcceptStr() {
        return acceptStr;
    }

    public void setAcceptStr(String acceptStr) {
        this.acceptStr = acceptStr;
    }

    @Override
    public void run() {
        //業務操作
        System.out.println("多線程已經處理訂單插入系統,訂單號:"+acceptStr);

        //線程阻塞
        /*try {
            Thread.sleep(1000);
            System.out.println("多線程已經處理訂單插入系統,訂單號:"+acceptStr);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }*/
    }
}

ThreadPoolExecutor帶Queue緩沖隊列的線程池 + JMeter模擬並發下單請求