併發中如何保證快取DB雙寫一致性(JAVA栗子)
併發場景中大部分處理的是先更新DB,再(刪緩、更新)快取的處理方式,但是在實際場景中有可能DB更新成功了,但是快取設定失敗了,就造成了快取與DB資料不一致的問題,下面就以實際情況說下怎麼解決此類問題。
名詞 Cache:本文內指redis,ReadRequest:請求從Cache、Db中拿去資料,WriteRequest:資料寫入DB並刪除快取
若要保證資料庫與快取一直,我們需要採用先刪快取,在更新DB的情況,這時候有的同學可能會問,如果快取刪除成功了,而DB更新失敗了怎麼辦,其實仔細考慮一下,DB雖然失敗了,那真正是不會產生資料影響的,而當下次一次請求進來的時候,我們重新把DB中未更新的資料重新塞入快取,從結果上來看是沒有影響的。我們把請求分為ReadRequest 、WriteRequest,大部分同學都知道我們在使用Cache時 首先都會去Cache內查一下,如果Cache中沒有拿到資料我們在從資料庫中去獲取資料,這個時候在高併發的場景的踩過坑的同學都知道恰巧在這時候有更新請求把快取刪除了,這時候大量請求進來,Cache內沒有此項資料,請求就會直接落在DB上,就很容易造成快取雪崩,資料庫很可能瞬時就掛掉了,所以處理方案就是我們需要對查詢寫入的快取進行排隊處理,而正確從cache內獲取的姿勢:
1、每次查詢資料的時候我們吧請求資料放入佇列,由佇列消費者去檢查一下cache是否存在,不存在則進行插入,存在就跳過
2、當前readRequest就自迴圈,我們不斷嘗試從cache內去獲取資料,拿到資料或超時當前執行緒立即退出
3、如果拿到資料了就返回結果,沒有拿到資料我們就從DB去查
而WriteRequest 的處理相對就簡單多了我們直接刪除快取後,更新DB即可,下面上程式碼說明:
訊息佇列這裡我們基於jdk併發包內的BlockingQueue進行實現,使用MQ(Rabbit,Kafka等)的話思想差不多,只是需要互動一次mq的服務端。首先專案啟動時我們在程式後臺開闢監聽執行緒,從資料共享緩衝區(ArrayBlockingQueue)內監聽訊息
public class BlockQueueThreadPool { /** * 核心執行緒數 */ private Integer corePoolSize = 10; /** * 執行緒池最大執行緒數 */ private Integer maximumPoolSize = 20; /** * 執行緒最大存活時間 */ private Long keepAliveTime = 60L; private ExecutorService threadPool = new ThreadPoolExecutor(this.corePoolSize, this.maximumPoolSize, this.keepAliveTime, TimeUnit.SECONDS, new ArrayBlockingQueue(this.corePoolSize)); public BlockQueueThreadPool() { RequestQueue requestQueue = RequestQueue.getInstance(); BlockingQueue<RequestAction> queue = new ArrayBlockingQueue<>(this.corePoolSize); requestQueue.add(queue); this.threadPool.submit(new JobThread(queue)); } }
PS:ArrayBlockingQueue中很好的利用了Condition中的等待和通知功能,這裡我們就能實現對共享通道佇列的事件監聽了。
public class JobThread implements Callable<Boolean> { private BlockingQueue<RequestAction> queue; public JobThread(BlockingQueue<RequestAction> queue) { this.queue = queue; } @Override public Boolean call() throws Exception { try { while (true) { // ArrayBlockingQueue take方法 獲取佇列排在首位的物件,如果佇列為空或者佇列滿了,則會被阻塞住 RequestAction request = this.queue.take(); RequestQueue requestQueue = RequestQueue.getInstance(); Map<String, Boolean> tagMap = requestQueue.getTagMap(); if (request instanceof ReadRequest) { Boolean tag = tagMap.get(request.getIdentity()); if (null == tag) { tagMap.put(request.getIdentity(), Boolean.FALSE); } if (tag != null && tag) { tagMap.put(request.getIdentity(), Boolean.FALSE); } if (tag != null && !tag) { return Boolean.TRUE; } } else if (request instanceof WriteRequest) { // 如果是更新資料庫的操作 tagMap.put(request.getIdentity(), Boolean.TRUE); } // 執行請求處理 log.info("快取佇列執行+++++++++++++++++,{}", request.getIdentity()); request.process(); } } catch (Exception e) { e.printStackTrace(); } return Boolean.TRUE; } }
接下來就要定義我們的WriteRequest、ReadRequest了
@Slf4j public class ReadRequest<TResult> extends BaseRequest { public ReadRequest(String cacheKey, GetDataSourceInterface action) { super(cacheKey, action); } @Override public void process() { TResult result = (TResult) action.exec(); if (Objects.isNull(result)) { //防止快取擊穿 redis.set(cacheKey, "", 10000); } else { redis.set(cacheKey, result, 10000); } } }
public class WriteRequest<TResult> extends BaseRequest { public WriteRequest(String cacheKey, GetDataSourceInterface action) { super(cacheKey, action); } @Override public void process() { redis.del(cacheKey); action.exec(); } }
這裡我們需要坐下判斷,在資料庫內查詢資料為空後把“”寫入了快取,這樣子是避免有人惡意請求不存在的資料時造成快取擊穿。接下來就是我們針對各項業務場景中需要獲取與更新快取的路由端了
@UtilityClass public class RouteUtils { public static void route(RequestAction requestAction) { try { BlockingQueue<RequestAction> queue = RequestQueue.getInstance().getQueue(0); queue.put(requestAction); } catch (Exception e) { e.printStackTrace(); } } }
public class RequestQueue { private RequestQueue() { } private List<BlockingQueue<RequestAction>> queues = new ArrayList<>(); private Map<String, Boolean> tagMap = new ConcurrentHashMap<>(1); private static class Singleton { private static RequestQueue queue; static { queue = new RequestQueue(); } private static RequestQueue getInstance() { return queue; } } public static RequestQueue getInstance() { return Singleton.getInstance(); } public void add(BlockingQueue<RequestAction> queue) { this.queues.add(queue); } public BlockingQueue<RequestAction> getQueue(int index) { return this.queues.get(index); } public int size() { return this.queues.size(); } public Map<String, Boolean> getTagMap() { return this.tagMap; } }
這裡有一個小的知識點,很多時候我們在保證執行緒安全的時候多數會使用DSL雙鎖模型,但是我始終覺得這類程式碼不夠美觀,所以我們可以利用JVM的類載入原則,使用靜態類包裹初始化類,這樣子也一定能保證單例模型,並且程式碼也更美觀了。接下來就可以看下Service的程式碼
@Service public class StudentService { public Student getStudent(String name) { ReadRequest<Student> readRequest = new ReadRequest<>(name, () -> Student.builder().name(name).age(3).build()); return CacheProcessor.builder().build().getData(readRequest); } public void update(Student student) { WriteRequest<Student> writeRequest = new WriteRequest<>(student.getName(), () -> student); CacheProcessor.builder().build().setData(writeRequest); } }
Service內直接呼叫了Cachce的處理者,我們通過處理者來獲取快取與更新快取
@Builder public class CacheProcessor { public <TResult> TResult getData(ReadRequest readRequest) { try { RouteUtils.route(readRequest); long startTime = System.currentTimeMillis(); long waitTime = 0L; while (true) { if (waitTime > 3000) { break; } TResult result = (TResult) readRequest.redis.get(readRequest.getIdentity()); if (!Objects.isNull(result)) { return result; } else { Thread.sleep(20); waitTime = System.currentTimeMillis() - startTime; } } return (TResult) readRequest.get(); } catch (Exception e) { return null; } } public void setData(WriteRequest writeRequest){ RouteUtils.route(writeRequest); } }
這裡我們就先把請求資料傳送到資料共享渠道,消費者端與當前的ReadRequest執行緒同步執行,拿到資料後ReadRequest就立馬退出,超時後我們就從資料庫中獲取資料。這裡面我使用了java8 @FunctionalInterface 標記介面,對各個業務中需要用到快取的地方統一進行封裝方便呼叫,以上的程式碼就已經基本說明併發中Db和Cache雙休一致性的解決思路,聰明的小夥伴肯定能看出其實還有很多優化的地方,比如說我們栗子中是單執行緒吞吐量不高,採用多執行緒與多消費者端的時候我們還需要保證商品的更新和讀取請求需要落在同一個消費者端等等問題。或者在使用外部MQ時,我們除了要考慮以上同一商品的讀寫保證落在一個消費節點上,還需要考慮佇列內有插入快取請求的時候需要跳過的處理等等,更多情況還需要根據實際情況大家自己去發現咯
參考:中華石杉的教程
&n