dubbo 叢集容錯模式原始碼學習---ForkingCluster
阿新 • • 發佈:2018-10-31
ForkingCluster 模式
思想:服務端多個節點並行響應(併發任務)某個服務的請求,將執行完的結果放在一個阻塞佇列中(FIFO)。第一個完成的任務的結果進入佇列,將會馬上返回,不會等待所有任務執行完成, 只要有一個任務完成,結果放到佇列中,佇列中彈出第一個(最快完成響應的那個節點)結果, 返回服務請求方。如果n 個併發任務都出錯了,拋異常,將最後一個異常放到佇列中。服務請求返回就是拋異常。通常用於實時性高的讀操作,相對耗費資源。最大並行數要可配置。
public class ForkCluster { private final ExecutorService executor = Executors.newCachedThreadPool(); @Test public void run() { String response = forkCluster(); System.out.println(response); } /** * 思想:多個節點並行響應(併發任務)某個服務的請求,將執行完的結果放在一個阻塞佇列中(FIFO)。 * 不會等待所有任務執行完成, 只要有一個任務完成,結果放到佇列中,佇列中彈出第一個(最快完成響應的那個節點)結果, 返回服務請求方。 * 如果n 個併發任務都出錯了,拋異常,將最後一個異常放到佇列中。服務請求返回就是拋異常。 * * 通常用於實時性高的讀操作,相對耗費資源。最大並行數要可配置。 * * */ public String forkCluster() { Node node1 = new Node(1); Node node2 = new Node(2); Node node3 = new Node(3); Node node4 = new Node(4); List<Node> cluster = new ArrayList<>(); cluster.add(node1); cluster.add(node2); cluster.add(node3); cluster.add(node4); BlockingQueue<Object> responses = new LinkedBlockingQueue<>(); final AtomicInteger count = new AtomicInteger(); // 統計異常節點的個數. for (Node node : cluster) { executor.execute(new Runnable() { @Override public void run() { try { String response = node.doSomething(); responses.offer(response); }catch (Throwable e) { int value = count.incrementAndGet(); if (value >= cluster.size()) responses.offer(e); } } }); } try { Object resp = responses.poll(1000, TimeUnit.MILLISECONDS); if (resp instanceof Throwable) { throw new RuntimeException("Cannot perform cluster fork. "); } return (String)resp; } catch (InterruptedException e) { throw new RuntimeException("Cannot get cluster fork response."); } } class Node { private int id; Node(int id) { this.id = id; } String doSomething() { return "Node " + id + "第一個完成響應"; }; }
}