《Java高併發程式設計》學習 --7.10 多個Actor同時修改資料:Agent
阿新 • • 發佈:2019-01-10
在Actor的程式設計模型中,Actor之間主要通過訊息進行資訊傳遞。因此,很少發生多個Actor需要訪問一個共享變數的情況。但在實際開發中,這種情況很難完全避免。如果多個Actor需要對同一個共享變數進行讀寫時,如何保證執行緒安全呢?
在Akka中,使用一種叫做Agent的元件來實現這個功能。一個Agent提供了對一個變數的非同步更新。當一個Actor希望改變Agent的值時,它會向這個Agent下發一個動作(action)。當多個Actor同時改變Agent時,這些action將會在ExecutionContext中被併發排程執行。在任何時刻,一個Agent最多隻能執行一個action,對於某一個執行緒來說,它執行action的順序與它的發生順序一致,但對於不同執行緒來說,這些action可能會交織在一起。
Agent的修改可以使用兩個方法send()或者alter()。它們都可以向Agent傳送一個修改動作。但是send()方法沒有返回值,而alter()方法會返回一個Future物件便於跟蹤Agent的執行。
下面模擬一個場景:有10個Actor,它們一起對一個Agent執行累加操作,每個agent累加10000次,如果沒有意外,那麼agent最終的值將是100000,如果Actor間的排程出現問題,那麼這個值可能小於100000。
上述程式碼定義了一個累加的Actor:CounterActor,且定義了累計動作 action addMapper。它的作用是對Agent的值進行修改。 CounterActor的訊息處理函式onReceive()中,對全域性counterAgent進行累加操作,alter()指定了累加動作addMapper。由於我們希望在將來知道累加行為是否完成,因此在這裡將返回Future物件進行收集。完成任務後,Actor自行退出。 程式的主函式如下:public class CounterActor extends UntypedActor { Mapper<Integer, Integer> addMapper = new Mapper<Integer, Integer>() { public Integer apply(Integer i) { return i + 1; }; }; @Override public void onReceive(Object msg) throws Exception { if(msg instanceof Integer) { for(int i=0; i<10000; i++) { Future<Integer> f = AgentDemo.counterAgent.alter(addMapper); AgentDemo.futures.add(f); } getContext().stop(getSelf()); } else unhandled(msg); } }
上述程式碼中,建立了10個CounterActor物件。使用Inbox與CounterActor進行通訊。第14行的訊息將觸發CounterActor進行累加操作。第20~30行系統將等待所有10個CounterActor執行結束。執行完成後,我們便已經收集了所有的Future。在第32行,將所有的Future進行序列組合(使用sequence()方法),構造了一個整體的Future,併為它建立onComplete()回撥函式。在所有的Agent操作執行完成後,onComplete()方法就會被呼叫(第35行)。在這個例子中,簡單地輸出最終的counterAgent值,並關閉系統。 執行上述程式,得到結果:public class AgentDemo { public static Agent<Integer> counterAgent = Agent.create(0, ExecutionContexts.global()); static ConcurrentLinkedDeque<Future<Integer>> futures = new ConcurrentLinkedDeque<Future<Integer>>(); public static void main(String[] args) { final ActorSystem system = ActorSystem.create("agentdemo", ConfigFactory.load("samplehello.conf")); ActorRef[] counter = new ActorRef[10]; for(int i=0; i<counter.length; i++) { counter[i] = system.actorOf(Props.create(CounterActor.class),"counter_" + i); } final Inbox inbox = Inbox.create(system); for(int i=0; i<counter.length; i++) { inbox.send(counter[i], 1); inbox.watch(counter[i]); } int closeCount = 0; //等待所有Actor全部結束 while(true) { Object msg = inbox.receive(Duration.create(1, TimeUnit.SECONDS)); if(msg instanceof Terminated) { closeCount++; if(closeCount == counter.length) { break; } } else { System.out.println(msg); } } //等待所有的累加執行緒完成,因為他們都是非同步的 Futures.sequence(futures, system.dispatcher()).onComplete( new OnComplete<Iterable<Integer>>() { @Override public void onComplete(Throwable arg0, Iterable<Integer> arg1) throws Throwable { System.out.println("counterAgent=" + counterAgent.get()); system.shutdown(); } }, system.dispatcher()); } }
counterAgent=100000