利用多condition動態管理池化的非同步資源
阿新 • • 發佈:2019-12-31
背景
數字證書申請需要私鑰和CSR資料,但公私鑰對的生成,尤其是強度較高如2048位祕鑰,平均單個耗時接近300ms,且生成時間隨著隨機數的變化而出現較大的波動;
KeyPairGenerator generator = KeyPairGenerator.getInstance("RSA");
generator.initialize(2048,new SecureRandom());
long startTime = System.currentTimeMillis();
for(int i = 0; i < 100; i++) {
generator.generateKeyPair();
}
System.out.println("cost: " + (System.currentTimeMillis() - startTime));
複製程式碼
為保證系統的快速響應,祕鑰對必須預先生成並儲存在祕鑰池中,在申請數字證書時直接從祕鑰池中讀取即可;
在系統早期,由於業務規模不大且業務量基本穩定,為了節省部署成本,祕鑰對生成與證書申請在同一個服務中執行,採用簡單的生產者消費者模型進行實現:
- 使用執行緒池進行祕鑰對的持續生成,執行緒池核心執行緒數=最大執行緒數=(Runtime.getRuntime().availableProcessors() - 1);
- 使用LinkedBlockingQueue儲存生成的祕鑰對,並設定佇列上限2000個;
問題
有一日,某網際網路金融客戶需要進行線上活動,由於客戶已知使用者建立和申請數字證書過程較為耗時,故客戶選擇提前批量建立使用者,再進行線上活動。結果可想而知:
- 證書申請服務本地祕鑰池被消耗殆盡,快取擊穿;
- 業務觸發同步生成祕鑰對,CPU資源競爭加劇,祕鑰池生成速度更加緩慢,使用者建立大量超時;
- 服務未統一網路出口,CA中心存在IP白名單限制,無法快速完成擴容;
- 即使完成擴容,因為祕鑰池為空,業務依然同步生成祕鑰,效果一般;
最後只能使出殺手鐗:限制客戶的流量,並與客戶溝通說系統進行了限流……
優化
原業務模型存在以下典型問題:
- 祕鑰對生成屬於後臺運算密集型業務,與使用者業務無關,但與業務存在CPU資源競爭,可導致證書申請業務效能不穩定;
- 祕鑰對生成能力與業務能力耦合,無法實現單項能力的快速擴容;
- 各Queue資料無法快速進行統一統計,無法快速簡單的監控祕鑰池中的快取物件數量;
- 服務重啟後已經生成的祕鑰對消失,導致系統不允許快速進行服務重啟;
針對上述問題,進行以下思路的優化:
- 引入redis作為集中式快取,通過快取池解耦使用者業務和後臺工具,並解決祕鑰池重啟丟失的問題;
- 使用redis快取的List結構儲存祕鑰對,祕鑰對最大上限為10W;
- 對redis中List的元素個數進行監控,數量低於總數的35%時進行告警,並以此可實現服務叢集動態自動擴容(暫未折騰);
- 使用reetrantLock和Condition控制多個佇列的特性進行生產者的生產控制;
- 祕鑰生成服務採用執行緒池生成祕鑰對,當redis池中總數滿時,執行緒進入await狀態,等待被喚醒;
- 祕鑰生成服務基於Spring Schedule採用固定間隔5s的速度輪詢池中物件數量,低於75%固定喚醒一半執行緒池執行緒生成祕鑰;低於50%時喚醒所有執行緒池執行緒生成祕鑰。
簡化實現
考慮到執行緒數遠遠小於快取物件數,所以控制10W只是一個近似值,多幾個少幾個在當前業務場景下對redis本身不會造成太大的影響
祕鑰對生成worker
/**
* 祕鑰對生成執行執行緒池
*/
public final class KeyPairExecutors {
// Redis Client 樁類
private final RedisCacheDAO cacheDAO;
// 鎖
private final ReentrantLock lock;
// 與執行緒數量相當的condition
private final List<Condition> workerConditions;
public KeyPairExecutors(ReentrantLock lock,List<Condition> workerConditions) {
this.cacheDAO = new RedisCacheDAO();
this.workerConditions = workerConditions;
this.lock = lock;
}
/**
* 工作執行緒構造方法
*/
public void start() {
int coreNum = workerConditions.size();
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(coreNum,coreNum,120,TimeUnit.SECONDS,new SynchronousQueue<>(),new KeyPairGeneratorThreadFactory("keypair_gen_",workerConditions.size()),new ThreadPoolExecutor.DiscardOldestPolicy());
Stream.iterate(0,n -> n + 1).limit(workerConditions.size())
.forEach( i -> poolExecutor.submit(new KeyPairRunable(cacheDAO,lock,workerConditions.get(i))));
}
class KeyPairRunable implements Runnable {
private final RedisCacheDAO cacheDAO;
private final ReentrantLock lock;
private final Condition condition;
public KeyPairRunable(RedisCacheDAO cacheDAO,ReentrantLock lock,Condition condition) {
this.cacheDAO = cacheDAO;
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
while(true) {
String keyBytes = genKeyPair();
try {
int currentSize = cacheDAO.listLpush(keyBytes);
// 寫入記錄後實時返回當前List元素數
if(currentSize >= RedisCacheDAO.MAX_CACHE_SIZE) {
System.out.println("cache is full. " + Thread.currentThread().getName() + " ready to park.");
lock.lock();
condition.await();
System.out.println("cache is consuming. " + Thread.currentThread().getName() + " unparked.");
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " is interuupted.");
} finally {
if(lock.isLocked()) {
lock.unlock();
}
}
}
}
private String genKeyPair() {
// TODO 祕鑰對樁
return "";
}
}
class KeyPairGeneratorThreadFactory implements ThreadFactory {
private final String threadGroupName;
private final AtomicInteger idSeq;
public KeyPairGeneratorThreadFactory(String threadGroupName,int maxSeq) {
this.threadGroupName = threadGroupName;
this.idSeq = new AtomicInteger(maxSeq);
}
@Override
public Thread newThread(Runnable r) {
int threadId = idSeq.getAndDecrement();
if(threadId < 0) {
throw new UnsupportedOperationException("thread number cannot be out of range");
}
return new Thread(r,threadGroupName + "_" + threadId);
}
}
}
複製程式碼
祕鑰對生成monitor
/**
* 祕鑰對生成定時排程
*/
public enum KeyPairsMonitor {
INSTANCE;
private final ReentrantLock reentrantLock;
private final List<Condition> conditionList;
private final RedisCacheDAO redisCacheDAO;
private final int coreSize;
KeyPairsMonitor() {
this.redisCacheDAO = new RedisCacheDAO();
this.reentrantLock = new ReentrantLock();
coreSize = Runtime.getRuntime().availableProcessors();
this.conditionList = new ArrayList<>(coreSize);
for( int i=0; i< coreSize; i++ ) {
conditionList.add(reentrantLock.newCondition());
}
}
/**
* 啟動金鑰生成任務,開啟排程
*/
public void monitor() {
KeyPairExecutors executors = new KeyPairExecutors(reentrantLock,conditionList);
executors.start();
buildMonitorSchedule();
}
/**
* 構造定時任務
*/
private void buildMonitorSchedule() {
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
int currentSize = redisCacheDAO.listLlen();
System.out.println("current cache size is: " + currentSize);
int executNum = 0;
if(currentSize <= RedisCacheDAO.HALF_MAX_CACHE_SIZE) {
System.out.println("current cache level is under 50% to ." + currentSize);
executNum = coreSize;
} else if(currentSize <= RedisCacheDAO.PERCENT_75_MAX_CACHE_SIZE) {
System.out.println("current cache level is under 75% to ." + currentSize);
executNum = coreSize >> 1;
}
for(int i=0; i < executNum; i++) {
try {
reentrantLock.lock();
conditionList.get(i).signal();
} catch (IllegalMonitorStateException e) {
// do nothing,condition no await
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
if(reentrantLock.isLocked()) {
reentrantLock.unlock();
}
}
}
}
},5,TimeUnit.SECONDS);
}
複製程式碼
打樁redis操作List操作:
public class RedisCacheDAO {
public static final String DEFAULT_KEYPAIE_CACHE_KEY = "keypaie_redis_list_rsa_byte";
public static final int MAX_CACHE_SIZE = 1 << 4;
public static final int HALF_MAX_CACHE_SIZE = MAX_CACHE_SIZE >> 1;
public static final int PERCENT_75_MAX_CACHE_SIZE = MAX_CACHE_SIZE - (MAX_CACHE_SIZE >> 2);
private String key;
private static final AtomicInteger count = new AtomicInteger(1);
public RedisCacheDAO() {
this.key = DEFAULT_KEYPAIE_CACHE_KEY;
}
public RedisCacheDAO(String key) {
this.key = key;
}
public int listLpush(String value) {
System.out.println(Thread.currentThread().getName() + " push value");
return count.addAndGet(1);
}
public int listLlen() {
return count.get();
}
public void listPop(int newValue) {
count.getAndSet(newValue);
}
}
複製程式碼
Main方法:
public static void main(String[] args) throws InterruptedException {
KeyPairsMonitor monitor = KeyPairsMonitor.INSTANCE;
monitor.monitor();
while (true) {
RedisCacheDAO dao = new RedisCacheDAO();
Thread.sleep(10);
dao.listPop(new Random().nextInt(RedisCacheDAO.MAX_CACHE_SIZE));
}
}
複製程式碼
思考
按照上述方案修改後,至少再面對流量峰值時,可以快速購買高核配置ECS,快速擴容。 但是,由於排程者是分散在各個伺服器中獨立執行的,如果排程執行緒異常退出,那麼該伺服器的工作執行緒將永遠不能工作。
目前考慮:
思路1: 採用分散式任務排程,採用統一維護的排程中心進行工作者的排程,但系統複雜度將遠大於當前實現;
思路2: 使用condition的超時機制實現自治,即設定condition的超時時間,超時過後主動生成寫入,配合快取的allkeys-random淘汰策略,即可緩解上述問題帶來的風險。