1. 程式人生 > >java併發-模擬秒殺

java併發-模擬秒殺

模擬秒殺

思路

在這裡插入圖片描述

  • 對每個秒殺請求入隊操作
  • 當庫存為N時,佇列的長度超過N時,可以考慮拒絕後續請求,直接響應客戶端秒殺結束
  • 為了減輕庫存處理的壓力,驗證併發量,這裡通過訊號量來控制執行緒安全。

編碼

  • 通過Semaphore來控制併發量
  • 通過CAS來控制更新庫存,保證執行緒安全
/***
 * 模擬秒殺
 */
public class Knock {

    /**
     * CAS操作的類
      */
    private static final Unsafe unsafe;
    /**
     * total的偏移量
     */
private static final long totalOffset; /** * 單例項 */ private static volatile Knock knock; /** * 總數量 */ private volatile int total; /** * 訊號量 */ private Semaphore semaphore; /** * 初始化UnSafe * 只能通過反射例項化 * 在catch出只能丟擲Error,unsafe只能初始化一次 */
static { try { Field field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); unsafe = (Unsafe) field.get(null); totalOffset = unsafe.objectFieldOffset(Knock.class.getDeclaredField("total")); } catch (Exception
e) { e.printStackTrace(); throw new Error(e); } } /** * 初始化公平的訊號量 * * @param total 總數 * @param threadsPerTime 執行緒所需要的訊號,控制併發量 */ private Knock(int total, int threadsPerTime) { this.total = total; semaphore = new Semaphore(threadsPerTime, true); } /** * 單例 lazy * * @param total * @param threadsPerTime * @return */ public static Knock getInstance(int total, int threadsPerTime) { //one check if (null != knock) { return knock; } //double check synchronized (Knock.class) { if (null == knock) { //knock需要加上volatile關鍵字,1.禁止重排序 2.執行緒間可見 knock = new Knock(total, threadsPerTime); } } return knock; } public int getTotal() { return total; } /** * CAS 減法 * 死迴圈退出的條件 * 1. 總數大於0的情況下,去做一次CAS操作,操作成功,則返回,失敗則迴圈 * 2. 如果total==0的情況下,直接返回false,終止搶購 * @return */ public boolean casTotal(int except) { for (; ; ) { if (total > 0) { int update = total - 1; if (unsafe.compareAndSwapInt(this, totalOffset, except, update)) { return true; } } else { return false; } } } /** * 搶購 * * @param need */ public void doKnock(int need) { //當佇列的長度時商品總量的兩倍就返回,搶購失敗 //當total==0的時候,搶購失敗 if (semaphore.getQueueLength() > (total << 1) || total == 0) { System.out.println(Thread.currentThread().getId() + ":已售罄!"); return; } //搶購 try { //獲取資源 semaphore.acquire(need); //這裡要check total的值,不符合就直接返回 if (total == 0) { System.out.println(Thread.currentThread().getId() + "已售罄!"); return; } //這裡必須通過區域性變數接收,因為訊號量模式,同一時間有多個執行緒在同時執行,是執行緒不安全的 int expect = total; //CAS修改當前庫存 if (casTotal(expect)) { //current是當前執行緒消費過後的庫存 int current = expect - 1; System.out.println(Thread.currentThread().getId() + "當前剩餘:" + current); } } catch (Exception e) { e.printStackTrace(); } finally { //釋放資源 semaphore.release(need); } } }

測試,通過CPU核心數,去控制併發的執行緒,提高QPS

public class KTest {

    static class T extends Thread {
        private Knock knock;
        private int need;

        public T(Knock knock, int need) {
            this.knock = knock;
            this.need = need;
        }

        @Override
        public void run() {
            knock.doKnock(need);
        }
    }

    public static void main(String[] args) {
        //獲取可用CPU核心數
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        long s = System.currentTimeMillis();
        Knock knock = Knock.getInstance(1100, availableProcessors*2);
        for (int i = 0; i < 2000; i++) {
            T t = new T(knock, 1);
            try {
                t.start();
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        long e = System.currentTimeMillis();
        System.out.println(knock.getTotal() + "======================"+(e-s));
    }
}

GitHub主頁