1. 程式人生 > >javaWeb 使用執行緒池+佇列解決"訂單併發"問題

javaWeb 使用執行緒池+佇列解決"訂單併發"問題

遇到問題:

最近做微信支付,專案上線一陣,發現一個問題。有一條訂單流水居然在資料庫的出現兩次。這個問題非常嚴重。

檢視微信回撥系統的介面程式碼發現程式碼是沒錯的(正常情況下),而這次遇到非正常情況了

原因:微信支付成功後回撥我們系統介面在極短時間回調了2次,微信官方文件說明了,是最短15s回撥一次。

前幾天微信支付抽風了,可能業務出現了波動。

簡單來說就是在併發情況下沒有做資料唯一性處理,不管怎麼樣這類併發情況都是有必要的處理。

解決方式:使用執行緒池+佇列

專案基於Spring,如果不用spring需要自己把

ThreadPoolManager.java

改成單例模式

1.寫一個Controller(Spring mvc)

/**
 * @author HeyS1
 * @date 2016/12/1
 * @description
 */
@Controller
public class ThreadPoolController {
    @Autowired
    ThreadPoolManager tpm;

    @RequestMapping("/pool")
    public
    @ResponseBody
    Object test() {
        for (int i = 0; i < 500; i++) {
            //模擬併發500條記錄
            tpm.processOrders
(Integer.toString(i)); } return "ok"; } }

2.執行緒池管理

/**
 * @author HeyS1
 * @date 2016/12/1
 * @description threadPool訂單執行緒池, 處理訂單
 * scheduler 排程執行緒池 用於處理訂單執行緒池由於超出執行緒範圍和佇列容量而不能處理的訂單
 */
@Component
public class ThreadPoolManager implements BeanFactoryAware {
    private static Logger log = LoggerFactory.getLogger(ThreadPoolManager.class);
    private
BeanFactory factory;//用於從IOC裡取物件 // 執行緒池維護執行緒的最少數量 private final static int CORE_POOL_SIZE = 2; // 執行緒池維護執行緒的最大數量 private final static int MAX_POOL_SIZE = 10; // 執行緒池維護執行緒所允許的空閒時間 private final static int KEEP_ALIVE_TIME = 0; // 執行緒池所使用的緩衝佇列大小 private final static int WORK_QUEUE_SIZE = 50; // 訊息緩衝佇列 Queue<Object> msgQueue = new LinkedList<Object>(); //用於儲存在佇列中的訂單,防止重複提交 Map<String, Object> cacheMap = new ConcurrentHashMap<>(); //由於超出執行緒範圍和佇列容量而使執行被阻塞時所使用的處理程式 final RejectedExecutionHandler handler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //System.out.println("太忙了,把該訂單交給排程執行緒池逐一處理" + ((DBThread) r).getMsg()); msgQueue.offer(((DBThread) r).getMsg()); } }; // 訂單執行緒池 final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler); // 排程執行緒池。此執行緒池支援定時以及週期性執行任務的需求。 final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5); // 訪問訊息快取的排程執行緒,每秒執行一次 // 檢視是否有待定請求,如果有,則建立一個新的AccessDBThread,並新增到執行緒池中 final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { if (!msgQueue.isEmpty()) { if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) { System.out.print("排程:"); String orderId = (String) msgQueue.poll(); DBThread accessDBThread = (DBThread) factory.getBean("dBThread"); accessDBThread.setMsg(orderId); threadPool.execute(accessDBThread); } // while (msgQueue.peek() != null) { // } } } }, 0, 1, TimeUnit.SECONDS); //終止訂單執行緒池+排程執行緒池 public void shutdown() { //true表示如果定時任務在執行,立即中止,false則等待任務結束後再停止 System.out.println(taskHandler.cancel(false)); scheduler.shutdown(); threadPool.shutdown(); } public Queue<Object> getMsgQueue() { return msgQueue; } //將任務加入訂單執行緒池 public void processOrders(String orderId) { if (cacheMap.get(orderId) == null) { cacheMap.put(orderId,new Object()); DBThread accessDBThread = (DBThread) factory.getBean("dBThread"); accessDBThread.setMsg(orderId); threadPool.execute(accessDBThread); } } //BeanFactoryAware @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { factory = beanFactory; } }

3.執行緒池中工作的執行緒

//執行緒池中工作的執行緒
@Component
@Scope("prototype")//spring 多例
public class DBThread implements Runnable {
    private String msg;
    private Logger log = LoggerFactory.getLogger(DBThread.class);

    @Autowired
    SystemLogService systemLogService;



    @Override
    public void run() {
        //模擬在資料庫插入資料
        Systemlog systemlog = new Systemlog();
        systemlog.setTime(new Date());
        systemlog.setLogdescribe(msg);
        //systemLogService.insert(systemlog);
        log.info("insert->" + msg);
    }


    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}

瀏覽器輸入地址127.0.0.1/pool

幾秒後關閉tomcat。

模擬500條資料,訂單執行緒池處理了117條。排程執行緒池處理5條

關閉tomcat,後還有378條未處理(這裡的實現需要用到spring監聽器)。加起來一共500

OK。完畢

spring監聽器,監聽tomcat關閉事件:

public class MyApplicationListener implements ApplicationListener<ApplicationEvent> {

    @Autowired
    ThreadPoolManager threadPoolManager;

    @Override
    public void onApplicationEvent(ApplicationEvent event) {

        if (event instanceof ContextClosedEvent) {
            XmlWebApplicationContext x = (XmlWebApplicationContext) event.getSource();
            //防止執行兩次。root application context 沒有parent,他就是老大
            if (x.getDisplayName().equals("Root WebApplicationContext")) {
                threadPoolManager.shutdown();
                Queue q = threadPoolManager.getMsgQueue();
                System.out.println("關閉了伺服器,還有未處理的資訊條數:" + q.size());
            }


        } else if (event instanceof ContextRefreshedEvent) {
//            System.out.println(event.getClass().getSimpleName()+" 事件已發生!");
        } else if (event instanceof ContextStartedEvent) {
//            System.out.println(event.getClass().getSimpleName()+" 事件已發生!");
        } else if (event instanceof ContextStoppedEvent) {
//            System.out.println(event.getClass().getSimpleName()+" 事件已發生!");
        } else {
//            System.out.println("有其它事件發生:"+event.getClass().getName());
        }
    }
}

spring配置一下

<bean id="springStartListener" class="com.temp.MyApplicationListener"></bean>

相關推薦

javaWeb 使用執行+佇列解決"訂單併發"問題

遇到問題: 最近做微信支付,專案上線一陣,發現一個問題。有一條訂單流水居然在資料庫的出現兩次。這個問題非常嚴重。 檢視微信回撥系統的介面程式碼發現程式碼是沒錯的(正常情況下),而這次遇到非正常情況了 原因:微信支付成功後回撥我們系統介面在極短時間回調了2次,微信官方文件說明

Java併發(二十一):執行實現原理 Java併發(十八):阻塞佇列BlockingQueue Java併發(十八):阻塞佇列BlockingQueue Java併發程式設計:執行的使用

一、總覽 執行緒池類ThreadPoolExecutor的相關類需要先了解:  (圖片來自:https://javadoop.com/post/java-thread-pool#%E6%80%BB%E8%A7%88) Executor:位於最頂層,只有一個 execute(Runnab

踩坑 Spring Cloud Hystrix 執行佇列配置

背景: 有一次在生產環境,突然出現了很多筆還款單被掛起,後來排查原因,發現是內部系統呼叫時出現了Hystrix呼叫異常。在開發過程中,因為核心執行緒數設定的比較大,沒有出現這種異常。放到了測試環境,偶爾有出現這種情況,後來在網上查詢解決方案,網上的方案是調整maxQueueSize屬性就好了,當時調整了一下

2、使用SPRING中的執行ThreadPoolTaskExecutor實現JAVA併發

new Thread的弊端如下:a. 每次new Thread新建物件效能差。b. 執行緒缺乏統一管理,可能無限制新建執行緒,相互之間競爭,及可能佔用過多系統資源導致宕機或oom。c. 缺乏更多功能,如定時執行、定期執行、執行緒中斷。相比new Thread,Java提供的四種執行緒池的好處在於:a

使用SPRING中的執行ThreadPoolTaskExecutor實現JAVA併發

使用SPRING中的執行緒池ThreadPoolTaskExecutor實現併發。 一:不需要返回值的情況  1,初始化執行緒池 Java程式碼   ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPool

執行,處理高併發問題,處理大資料量的方法

執行緒池個人認為,執行緒池的作用就是限制系統中執行執行緒的數量,避免伺服器超負荷;減少建立和銷燬執行緒的次數,從而減少了一些開銷。設計一個執行緒池單例,在內部建立指定數目的執行緒,並用一個執行緒空閒隊列表示可分配執行緒。注:還可以使用兩個靜態成員變數的方法限定最大執行緒數量。

ThreadPool.SetMaxThreads 執行 設定 最大併發 數量 失敗問題

很多時候設定這個會返回false。。。微軟寫了個限制。。。而且寫在API的角落裡面。。。--------不能將輔助執行緒的數目或 I/O 完成執行緒的數目設定為小於計算機的處理器數目。 誒。。。網上查了好幾次。。。沒有結果最後發現是這樣。。注意這個處理器數目意思是 邏輯處理器

執行+佇列 優先順序方式執行佇列任務

package com; import java.util.concurrent.TimeUnit; public class MyPriorityTask implements Runnable

執行佇列

ArrayBlockingQueue:基於陣列的FIFO佇列,是有界的,建立時必須指定大小 LinkedBlockingQueue: 基於連結串列的FIFO佇列,是無界的,預設大小是 Integer.MAX_VALUE synchronousQueue:一個比較特殊的佇列,雖

Java併發程式設計:4種執行和緩衝佇列BlockingQueue

一. 執行緒池簡介 1. 執行緒池的概念:           執行緒池就是首先建立一些執行緒,它們的集合稱為執行緒池。使用執行緒池可以很好地提高效能,執行緒池在系統啟動時即建立大量空閒的執行緒,程式將一個任務傳給執行緒池,執行緒池就會啟動一

springcloud非同步執行、高併發請求feign解決方案

ScenTaskTestApplication.java package com.test; import org.springframework.boot.SpringApplication; import org.springframework.boot.a

盤點java併發包提供的執行佇列

執行緒池 newCachedThreadPool() newFixedThreadPool(int nThreads) newSingleThreadPoolExecutor() newScheduledThreadPool(int corePoolSize

python併發程式設計之執行剩餘內容(執行佇列,執行)及協程

1. 執行緒的其他方法 import threading import time from threading import Thread,current_thread def f1(n): time.sleep(1) print('子執行緒名稱', current_thread()

基於C++11併發庫的執行與訊息佇列執行框架——std::thread類

1 前言  C++11標準在標準庫中為多執行緒提供了元件,這意味著使用C++編寫與平臺無關的多執行緒程式成為可能,而C++程式的可移植性也得到了有力的保證。    在之前我們主要使用的多執行緒庫要麼是屬於某個單獨平臺的,例如:POSIX執行緒庫(Linux),Windows

Java併發程式設計的藝術之九----執行

第一:降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。 第二:提高響應速度。當任務到達時,任務可以不需要等到執行緒建立就能立即執行。 第三:提高執行緒的可管理性。執行緒是稀缺資源,如果無限制地建立,不僅會消耗系統資源, 還會降低系統的穩定性,使用執行緒池可以進行統

Python併發程式設計之執行/程序

Python併發程式設計之執行緒池/程序池 2017/01/18 · 基礎知識 · 2 評論 · 併發, 執行緒池, 程序池 原文出處: ZiWenXie    引言 Pyt

java併發程式設計一一執行原理分析(三)

合理的設定執行緒池的大小 接著上一篇探討執行緒留下的尾巴。如果合理的設定執行緒池的大小。 要想合理的配置執行緒池的大小、首先得分析任務的特性,可以從以下幾個角度分析: 1、任務的性質:CPU密集型任務、IO密集型任務、混合型任務等; 2、任務的優先順序:高、中、低; 3、任務的執行時

java併發程式設計一一執行原理分析(二)

2、執行緒池 1、什麼是執行緒池 Java中的執行緒池是運用場景最多的併發框架,幾乎所有需要非同步或併發執行任務的程式都可以使用執行緒池。 在開發工程中,合理的使用執行緒池能夠帶來3個好處。 第一:降低資源的消耗,通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗 第二:提

java併發程式設計一一執行原理分析(一)

1、併發包 1、CountDownLatch(計數器) CountDownLatch 類位於 java.util.concurrent 包下,利用它可以實現類似於計數器的功能。 比如有一個任務A,它要等待其他4個任務執行完成之後才能執行,此時就可以利用CountDownLatch

java併發學習--執行(一)

關於java中的執行緒池,我一開始覺得就是為了避免頻繁的建立和銷燬執行緒吧,先建立一定量的執行緒,然後再進行復用。但是要具體說一下如何做到的,自己又說不出一個一二三來了,這大概就是自己的學習習慣流於表面,不經常深入的結果吧。所以這裡決定系統的學習一下執行緒池的相關知識。   自己稍微總結了一下,