自己實現定製自己的專屬java鎖,來高效規避不穩定的第三方
java juc 包下面已經提供了很多併發鎖工具供我們使用,但在日常開發中,為了各種原因我們總是會用多執行緒來併發處理一些問題,然而並不是所有的場景都可以使用juc 或者java本身提供的鎖來方便的幫助我們控制多執行緒帶來的併發問題,這個時候就需要我們根據自己的業務場景來子實現定製一把我們自己專屬的鎖,來滿足我們的需要。
假設系統對接了很多第三方公司,來幫助我們完成業務,但這些第三方的服務介面穩定性參差不齊,以往的過程中我們可能會做一些監控措施來幫助我們監控介面的穩定性,但這會存在一個問題,就是當我們監控到操作失敗的時候其實已經會有使用者產生操作失敗的結果了,這對重視使用者體驗的網際網路公司肯定是不能忍的,為此我們可以每個使用者來訪問時都同時呼叫多個第三方,只要有一個返回結果,我們就可以給使用者做相應的展示,這樣即使有一兩個第三方出現故障對使用者也是無感知的,但另一個問題來了,同時併發呼叫第三方我怎麼選哪個結果呢,很簡單,當然是返回最快的了!具體如何選用最快的返回結果就用到我們今天的主題了,定製自己的鎖。
上面所說的大致流程可以描述為這樣:接受使用者請求 → 多執行緒組裝報文呼叫第三方 → 阻塞等待 → 任意結果返回喚醒主執行緒繼續處理。基於此流程很自然想到這個阻塞其實就可以用多執行緒中的鎖來實現,主執行緒在將任務提交給執行緒池多執行緒處理後,去獲取一個鎖,而這個鎖需要線上程中第一個第三方返回結果時才能獲取到,這樣就讓主執行緒繼續執行,有了大概思路,我們來看下具體如何實現。
看過java 原始碼的同學對AbstractQueuedSynchronizer一定不會陌生,java中的很多鎖ReentrantLock 、ReadWriteLock 、ReentrantReadWriteLock 和一些其他的併發工具CountDownLatch、 Semaphore等都基於此抽象類實現,AbstractQueuedSynchronizer中通過一個FIFO佇列來管理等待加鎖的執行緒,通過一個state的int變數控制執行緒加鎖狀態,其內部也幫我實現了執行緒獲得鎖和掛起的方法,我們這裡參考CountDownLatch來實現,因為我們的需求和CountDownLatch正好相反,CountDownLatch是多個執行緒都處理完才能繼續,而我們是隻要有一個處理完就能繼續,簡單來說就是主執行緒喚醒的判斷條件不一致。先來看下CountDownLatch的使用:
public static void main(String[] args) throws InterruptedException { CountDownLatch await = new CountDownLatch(5); // 依次建立並啟動執行緒 for (int i = 0; i < 5; ++i) { new Thread(new MyRunnable(await)).start(); } await.await(); System.out.println("over!"); } class MyRunnable implements Runnable { private final CountDownLatch await; public MyRunnable(CountDownLatch await) { this.await = await; } public void run() { try { //業務處理 await.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }
熟悉套路之後為了更好的說明,我們引入其部分原始碼(jdk 1.8中部分方法)
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); } }
程式碼中可以看到,CountDownLatch主要就是倚靠一個內部類Sync來實現,而Sync實現了AbstractQueuedSynchronizer的tryAcquireShared和tryReleaseShared方法,這兩方法的主要目的是:tryAcquireShared就是在呼叫await方法後來判斷是否需要阻塞還是執行,tryReleaseShared 就是用來釋放state的狀態,而state的狀態又影響了tryAcquireShared的返回結果,決定了執行緒是阻塞還是會被喚起繼續執行,具體的判斷邏輯是在AbstractQueuedSynchronizer中的acquireSharedInterruptibly方法中
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
基於此我們的大概思路就是,利用自己的類實現tryAcquireShared和tryReleaseShared方法來幫助我們管理state狀態,決定主執行緒什麼時候可以獲得鎖繼續執行,什麼時候需要阻塞。依靠AbstractQueuedSynchronizer的內部機制幫助我們及時獲取子執行緒的處理資訊,最快的回到主執行緒來處理我們業務邏輯。實現程式碼如下:
package com.chengxiansheng.common; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class ReqBraker { private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); }
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { int c = getState(); c = 0; return true; } } private final Sync sync; private ResultDto resultDto;//處理結果 public ReqBraker(){ this.sync = new Sync(1); } /** * 請求返回 * 此處要考慮介面呼叫失敗的情況,如果失敗要等待其他執行緒則不呼叫此方法 */ public void reqReuturn(ResultDto resultDto){ this.resultDto = resultDto; sync.tryReleaseShared(1); } /** * 主執行緒等待指定最長等待時間 * @param timeout * @param unit * @return * @throws InterruptedException */ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /** * 獲取處理結果 * @return */ public ResultDto getResultDto(){ return resultDto; }
}
package com.chengxiansheng.common; public class RequsetWorker implements Runnable { private RequestDto requestDto;//請求資訊 private ReqBraker reqBraker; public RequsetWorker(RequestDto requestDto, ReqBraker reqBraker){ this.requestDto = requestDto; this.reqBraker = reqBraker; } public void run() { //請求第三方 //判斷返回結果 if(requestDto.isSeccess()){ reqBraker.reqReuturn(resultDto); } } }
具體的使用方法如下:
public void requestService(RequestDto request, List<Service> serviceList) { try { ReqBraker braker = new ReqBraker(); for (Service service : serviceList) { executorService.submit(new RequsetWorker(braker, request); } braker.await(5, TimeUnit.SECONDS); //請求處理完成 最長時間5S ResultDto result = braker.getResultDto(); if(result == null){ //可能超過了最終等待時間 //返回處理失敗; } //返回處理結果; } catch (Exception e) { //異常處理 }
當然實際的使用中可能會比這複雜,因為我們要有各種業務處理情況去要考慮,本文只是一個範例來幫助大家介紹一個新的思路來解決問題,合理利用Java中的工具的同事我們也要理解其實現原理,來定製符合我們使用場景的方法,才能寫出更高效的程式碼,實現更高效的系統。AbstractQueuedSynchronizer的作用也遠不止此,但我們掌握了它就可以更好的玩轉多執行緒,玩轉併發,來創新的實現各種複雜處理和邏