1. 程式人生 > 程式設計 >利用多condition動態管理池化的非同步資源

利用多condition動態管理池化的非同步資源

背景

數字證書申請需要私鑰和CSR資料,但公私鑰對的生成,尤其是強度較高如2048位祕鑰,平均單個耗時接近300ms,且生成時間隨著隨機數的變化而出現較大的波動;

KeyPairGenerator generator = KeyPairGenerator.getInstance("RSA");
generator.initialize(2048,new SecureRandom());

long startTime = System.currentTimeMillis();
for(int i = 0; i < 100; i++) {
     generator.generateKeyPair();
}
System.out.println("cost: "
+ (System.currentTimeMillis() - startTime)); 複製程式碼

為保證系統的快速響應,祕鑰對必須預先生成並儲存在祕鑰池中,在申請數字證書時直接從祕鑰池中讀取即可;

在系統早期,由於業務規模不大且業務量基本穩定,為了節省部署成本,祕鑰對生成與證書申請在同一個服務中執行,採用簡單的生產者消費者模型進行實現:

  • 使用執行緒池進行祕鑰對的持續生成,執行緒池核心執行緒數=最大執行緒數=(Runtime.getRuntime().availableProcessors() - 1);
  • 使用LinkedBlockingQueue儲存生成的祕鑰對,並設定佇列上限2000個;

簡單分散式祕鑰池

問題

有一日,某網際網路金融客戶需要進行線上活動,由於客戶已知使用者建立和申請數字證書過程較為耗時,故客戶選擇提前批量建立使用者,再進行線上活動。結果可想而知:

  1. 證書申請服務本地祕鑰池被消耗殆盡,快取擊穿;
  2. 業務觸發同步生成祕鑰對,CPU資源競爭加劇,祕鑰池生成速度更加緩慢,使用者建立大量超時;
  3. 服務未統一網路出口,CA中心存在IP白名單限制,無法快速完成擴容;
  4. 即使完成擴容,因為祕鑰池為空,業務依然同步生成祕鑰,效果一般;

最後只能使出殺手鐗:限制客戶的流量,並與客戶溝通說系統進行了限流……

優化

原業務模型存在以下典型問題:

  • 祕鑰對生成屬於後臺運算密集型業務,與使用者業務無關,但與業務存在CPU資源競爭,可導致證書申請業務效能不穩定;
  • 祕鑰對生成能力與業務能力耦合,無法實現單項能力的快速擴容;
  • 各Queue資料無法快速進行統一統計,無法快速簡單的監控祕鑰池中的快取物件數量;
  • 服務重啟後已經生成的祕鑰對消失,導致系統不允許快速進行服務重啟;

針對上述問題,進行以下思路的優化:

  • 引入redis作為集中式快取,通過快取池解耦使用者業務和後臺工具,並解決祕鑰池重啟丟失的問題;
  • 使用redis快取的List結構儲存祕鑰對,祕鑰對最大上限為10W;
  • 對redis中List的元素個數進行監控,數量低於總數的35%時進行告警,並以此可實現服務叢集動態自動擴容(暫未折騰);
  • 使用reetrantLock和Condition控制多個佇列的特性進行生產者的生產控制;
  • 祕鑰生成服務採用執行緒池生成祕鑰對,當redis池中總數滿時,執行緒進入await狀態,等待被喚醒;
  • 祕鑰生成服務基於Spring Schedule採用固定間隔5s的速度輪詢池中物件數量,低於75%固定喚醒一半執行緒池執行緒生成祕鑰;低於50%時喚醒所有執行緒池執行緒生成祕鑰。

解耦後系統結構

簡化實現

考慮到執行緒數遠遠小於快取物件數,所以控制10W只是一個近似值,多幾個少幾個在當前業務場景下對redis本身不會造成太大的影響

祕鑰對生成worker

/**
 * 祕鑰對生成執行執行緒池
 */
public final class KeyPairExecutors {

    // Redis Client 樁類
    private final RedisCacheDAO cacheDAO;
    // 鎖
    private final ReentrantLock lock;
    // 與執行緒數量相當的condition
    private final List<Condition> workerConditions;

    public KeyPairExecutors(ReentrantLock lock,List<Condition> workerConditions) {
        this.cacheDAO = new RedisCacheDAO();
        this.workerConditions = workerConditions;
        this.lock  = lock;
    }

    /**
     * 工作執行緒構造方法
     */
    public void start() {

        int coreNum = workerConditions.size();
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(coreNum,coreNum,120,TimeUnit.SECONDS,new SynchronousQueue<>(),new KeyPairGeneratorThreadFactory("keypair_gen_",workerConditions.size()),new ThreadPoolExecutor.DiscardOldestPolicy());

        Stream.iterate(0,n -> n + 1).limit(workerConditions.size())
                .forEach( i -> poolExecutor.submit(new KeyPairRunable(cacheDAO,lock,workerConditions.get(i))));
    }

    class KeyPairRunable implements Runnable {

        private final RedisCacheDAO cacheDAO;
        private final ReentrantLock lock;
        private final Condition condition;

        public KeyPairRunable(RedisCacheDAO cacheDAO,ReentrantLock lock,Condition condition) {
            this.cacheDAO = cacheDAO;
            this.lock = lock;
            this.condition = condition;
        }

        @Override
        public void run() {

            while(true) {

                String keyBytes = genKeyPair();

                try {
                    int currentSize = cacheDAO.listLpush(keyBytes);
                    // 寫入記錄後實時返回當前List元素數
                    if(currentSize >= RedisCacheDAO.MAX_CACHE_SIZE) {
                            System.out.println("cache is full. " + Thread.currentThread().getName() + " ready to park.");

                            lock.lock();
                            condition.await();

                            System.out.println("cache is consuming. " + Thread.currentThread().getName() + " unparked.");
                    }
                } catch (InterruptedException e) {
                    System.out.println(Thread.currentThread().getName() + " is interuupted.");
                } finally {

                    if(lock.isLocked()) {
                        lock.unlock();
                    }
                }
            }
        }

        private String genKeyPair() {

            // TODO 祕鑰對樁
            return "";
        }
    }

    class KeyPairGeneratorThreadFactory implements ThreadFactory {

        private final String threadGroupName;
        private final AtomicInteger idSeq;

        public KeyPairGeneratorThreadFactory(String threadGroupName,int maxSeq) {
            this.threadGroupName = threadGroupName;
            this.idSeq = new AtomicInteger(maxSeq);
        }

        @Override
        public Thread newThread(Runnable r) {

            int threadId = idSeq.getAndDecrement();
            if(threadId < 0) {
                throw new UnsupportedOperationException("thread number cannot be out of range");
            }

            return new Thread(r,threadGroupName + "_" + threadId);
        }
    }
}
複製程式碼

祕鑰對生成monitor

/**
 * 祕鑰對生成定時排程
 */
public enum  KeyPairsMonitor {

    INSTANCE;

    private final ReentrantLock reentrantLock;
    private final List<Condition> conditionList;
    private final RedisCacheDAO redisCacheDAO;
    private final int coreSize;

    KeyPairsMonitor() {

        this.redisCacheDAO = new RedisCacheDAO();
        this.reentrantLock = new ReentrantLock();

        coreSize = Runtime.getRuntime().availableProcessors();
        this.conditionList = new ArrayList<>(coreSize);
        for( int i=0; i< coreSize; i++ ) {
            conditionList.add(reentrantLock.newCondition());
        }
    }

    /**
     * 啟動金鑰生成任務,開啟排程
     */
    public void monitor() {

        KeyPairExecutors executors = new KeyPairExecutors(reentrantLock,conditionList);
        executors.start();

        buildMonitorSchedule();
    }

    /**
     * 構造定時任務
     */
    private void buildMonitorSchedule() {

        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
        service.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                int currentSize = redisCacheDAO.listLlen();
                System.out.println("current cache size is: " + currentSize);

                int executNum = 0;
                if(currentSize <= RedisCacheDAO.HALF_MAX_CACHE_SIZE) {
                    System.out.println("current cache level is under 50% to ." + currentSize);
                    executNum = coreSize;
                } else if(currentSize <= RedisCacheDAO.PERCENT_75_MAX_CACHE_SIZE) {
                    System.out.println("current cache level is under 75% to ." + currentSize);
                    executNum = coreSize >> 1;
                }

                for(int i=0; i < executNum; i++) {
                    try {
                        reentrantLock.lock();
                        conditionList.get(i).signal();
                    } catch (IllegalMonitorStateException e) {
                        // do nothing,condition no await
                    } catch (Exception e) {
                        System.out.println(e.getMessage());
                    } finally {
                        if(reentrantLock.isLocked()) {
                            reentrantLock.unlock();
                        }
                    }

                }
            }

        },5,TimeUnit.SECONDS);
    }
複製程式碼

打樁redis操作List操作:

public class RedisCacheDAO {

    public static final String DEFAULT_KEYPAIE_CACHE_KEY = "keypaie_redis_list_rsa_byte";
    public static final int MAX_CACHE_SIZE = 1 << 4;
    public static final int HALF_MAX_CACHE_SIZE = MAX_CACHE_SIZE >> 1;
    public static final int PERCENT_75_MAX_CACHE_SIZE = MAX_CACHE_SIZE - (MAX_CACHE_SIZE >> 2);

    private String key;
    private static final AtomicInteger count = new AtomicInteger(1);

    public RedisCacheDAO() {
        this.key = DEFAULT_KEYPAIE_CACHE_KEY;
    }

    public RedisCacheDAO(String key) {
        this.key = key;
    }

    public int listLpush(String value) {

        System.out.println(Thread.currentThread().getName() + " push value");
        return count.addAndGet(1);
    }

    public int listLlen() {
        return count.get();
    }

    public void listPop(int newValue) {
        count.getAndSet(newValue);
    }
}
複製程式碼

Main方法:

public static void main(String[] args) throws InterruptedException {

        KeyPairsMonitor monitor  = KeyPairsMonitor.INSTANCE;
        monitor.monitor();

        while (true) {
            RedisCacheDAO dao = new RedisCacheDAO();
            Thread.sleep(10);

            dao.listPop(new Random().nextInt(RedisCacheDAO.MAX_CACHE_SIZE));
        }
    }
複製程式碼

思考

按照上述方案修改後,至少再面對流量峰值時,可以快速購買高核配置ECS,快速擴容。 但是,由於排程者是分散在各個伺服器中獨立執行的,如果排程執行緒異常退出,那麼該伺服器的工作執行緒將永遠不能工作。

目前考慮:

思路1: 採用分散式任務排程,採用統一維護的排程中心進行工作者的排程,但系統複雜度將遠大於當前實現;

思路2: 使用condition的超時機制實現自治,即設定condition的超時時間,超時過後主動生成寫入,配合快取的allkeys-random淘汰策略,即可緩解上述問題帶來的風險。