java併發-模擬秒殺
阿新 • • 發佈:2018-12-22
模擬秒殺
思路
- 對每個秒殺請求入隊操作
- 當庫存為N時,佇列的長度超過N時,可以考慮拒絕後續請求,直接響應客戶端秒殺結束
- 為了減輕庫存處理的壓力,驗證併發量,這裡通過訊號量來控制執行緒安全。
編碼
- 通過Semaphore來控制併發量
- 通過CAS來控制更新庫存,保證執行緒安全
/***
* 模擬秒殺
*/
public class Knock {
/**
* CAS操作的類
*/
private static final Unsafe unsafe;
/**
* total的偏移量
*/
private static final long totalOffset;
/**
* 單例項
*/
private static volatile Knock knock;
/**
* 總數量
*/
private volatile int total;
/**
* 訊號量
*/
private Semaphore semaphore;
/**
* 初始化UnSafe
* 只能通過反射例項化
* 在catch出只能丟擲Error,unsafe只能初始化一次
*/
static {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
totalOffset = unsafe.objectFieldOffset(Knock.class.getDeclaredField("total"));
} catch (Exception e) {
e.printStackTrace();
throw new Error(e);
}
}
/**
* 初始化公平的訊號量
*
* @param total 總數
* @param threadsPerTime 執行緒所需要的訊號,控制併發量
*/
private Knock(int total, int threadsPerTime) {
this.total = total;
semaphore = new Semaphore(threadsPerTime, true);
}
/**
* 單例 lazy
*
* @param total
* @param threadsPerTime
* @return
*/
public static Knock getInstance(int total, int threadsPerTime) {
//one check
if (null != knock) {
return knock;
}
//double check
synchronized (Knock.class) {
if (null == knock) {
//knock需要加上volatile關鍵字,1.禁止重排序 2.執行緒間可見
knock = new Knock(total, threadsPerTime);
}
}
return knock;
}
public int getTotal() {
return total;
}
/**
* CAS 減法
* 死迴圈退出的條件
* 1. 總數大於0的情況下,去做一次CAS操作,操作成功,則返回,失敗則迴圈
* 2. 如果total==0的情況下,直接返回false,終止搶購
* @return
*/
public boolean casTotal(int except) {
for (; ; ) {
if (total > 0) {
int update = total - 1;
if (unsafe.compareAndSwapInt(this, totalOffset, except, update)) {
return true;
}
} else {
return false;
}
}
}
/**
* 搶購
*
* @param need
*/
public void doKnock(int need) {
//當佇列的長度時商品總量的兩倍就返回,搶購失敗
//當total==0的時候,搶購失敗
if (semaphore.getQueueLength() > (total << 1) || total == 0) {
System.out.println(Thread.currentThread().getId() + ":已售罄!");
return;
}
//搶購
try {
//獲取資源
semaphore.acquire(need);
//這裡要check total的值,不符合就直接返回
if (total == 0) {
System.out.println(Thread.currentThread().getId() + "已售罄!");
return;
}
//這裡必須通過區域性變數接收,因為訊號量模式,同一時間有多個執行緒在同時執行,是執行緒不安全的
int expect = total;
//CAS修改當前庫存
if (casTotal(expect)) {
//current是當前執行緒消費過後的庫存
int current = expect - 1;
System.out.println(Thread.currentThread().getId() + "當前剩餘:" + current);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//釋放資源
semaphore.release(need);
}
}
}
測試,通過CPU核心數,去控制併發的執行緒,提高QPS
public class KTest {
static class T extends Thread {
private Knock knock;
private int need;
public T(Knock knock, int need) {
this.knock = knock;
this.need = need;
}
@Override
public void run() {
knock.doKnock(need);
}
}
public static void main(String[] args) {
//獲取可用CPU核心數
int availableProcessors = Runtime.getRuntime().availableProcessors();
long s = System.currentTimeMillis();
Knock knock = Knock.getInstance(1100, availableProcessors*2);
for (int i = 0; i < 2000; i++) {
T t = new T(knock, 1);
try {
t.start();
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long e = System.currentTimeMillis();
System.out.println(knock.getTotal() + "======================"+(e-s));
}
}