分散式應用框架Akka快速入門
本文結合網上一些資料,對他們進行整理,摘選和翻譯而成,對Akka進行簡要的說明。引用資料在最後列出。
1.什麼是Akka
Akka 是一個用 Scala 編寫的庫,用於簡化編寫容錯的、高可伸縮性的 Java 和 Scala 的 Actor 模型應用。
Akka is a toolkit and runtime for building highly concurrent, distributed, and fault tolerant event-driven applications on the JVM.
Build powerful concurrent & distributed applications more easily
翻譯成中文就是:Akka是一個開發庫和執行環境,可以用於構建高併發、分散式、可容錯、事件驅動的基於JVM的應用。使構建高併發的分散式應用更加容易。
Akka可以以兩種不同的方式來使用
- 以庫的形式:在web應用中使用,放到 WEB-INF/lib 中或者作為一個普通的Jar包放進classpath。
- 以微核心的形式:你可以將應用放進一個獨立的核心。
2.Akka的五大特性
1)易於構建並行和分散式應用 (Simple Concurrency & Distribution)Akka在設計時採用了非同步通訊和分散式架構,並對上層進行抽象,如Actors、Futures ,STM等。
2)可靠性(Resilient by Design)
系統具備自愈能力,在本地/遠端都有監護。
3)高效能(High Performance)
在單機中每秒可傳送50000000個訊息。記憶體佔用小,1GB記憶體中可儲存2500000個actors。
4)彈性,無中心(Elastic — Decentralized)
自適應的負責均衡,路由,分割槽,配置5)可擴充套件(Extensible)
可以使用Akka 擴充套件包進行擴充套件。
3.什麼場景下特別適合使用Akka?
我們看到Akka被成功運用在眾多行業的眾多大企業,從投資業到商業銀行、從零售業到社會媒體、模擬、遊戲和賭博、汽車和交通系統、資料分析等等等等。任何需要高吞吐率和低延遲的系統都是使用Akka的候選。
Actor使你能夠進行服務失敗管理(監管者),負載管理(緩和策略、超時和隔離),水平和垂直方向上的可擴充套件性(增加cpu核數和/或增加更多的機器)管理。
所有以上這些都在這個Apache2許可的開源軟體中。
以下是Akka被部署到生產環境中的領域事務處理 (線上遊戲,金融/銀行業,貿易,統計,賭博,社會媒體,電信):垂直擴充套件,水平擴充套件,容錯/高可用性
服務後端 (任何行業,任何應用):提供REST, SOAP, Cometd, WebSockets 等服務 作為訊息匯流排/整合層 垂直擴充套件,水平擴充套件,容錯/高可用性
併發/並行 (任何應用):執行正確,方便使用,只需要將jar包新增到現有的JVM專案中(使用Scala,java, Groovy或jruby)
模擬:主/從,計算網格,MaReduce等等.
批處理 (任何行業):Camel整合來連線批處理資料來源 Actor來分治地批處理工作負載
通訊Hub (電信, Web媒體, 手機媒體):垂直擴充套件,水平擴充套件,容錯/高可用性
遊戲與賭博 (MOM, 線上遊戲, 賭博):垂直擴充套件,水平擴充套件,容錯/高可用性
商業智慧/資料探勘/通用資料處理:垂直擴充套件,水平擴充套件,容錯/高可用性
複雜事件流處理:垂直擴充套件,水平擴充套件,容錯/高可用性
4. Scala語言
Scala是一種多正規化的程式語言,設計初衷是要整合面向物件程式設計和函數語言程式設計的各種特性。
百度百科,Scala語言介紹:
百度文庫,Scala程式設計入門:
5.Actors模型
Actor模型並非什麼新鮮事物,它由Carl Hewitt於上世紀70年代早期提出,目的是為了解決分散式程式設計中一系列的程式設計問題。其特點如下: 系統中的所有事物都可以扮演一個Actor Actor之間完全獨立 在收到訊息時Actor所採取的所有動作都是並行的,在一個方法中的動作沒有明確的順序 Actor由標識和當前行為描述 Actor可能被分成原始(primitive)和非原始(non primitive)類別很多開發語言都提供了原生的Actor模型。例如erlang,scala等
Actor,可以看作是一個個獨立的實體,他們之間是毫無關聯的。但是,他們可以通過訊息來通訊。
一個Actor收到其他Actor的資訊後,它可以根據需要作出各種相應。訊息的型別可以是任意的,訊息的內容也可以是任意的。這點有點像webservice了。只提供介面服務,你不必瞭解我是如何實現的。
一個Actor如何處理多個Actor的請求呢?它先建立一個訊息佇列,每次收到訊息後,就放入佇列,而它每次也從佇列中取出訊息體來處理。通常我們都使得這個過程是迴圈的。讓Actor可以時刻處理髮送來的訊息。
6.示例(http://www.th7.cn/Program/java/2012/03/29/67015.shtml)
應用場景:服務端要處理大量的客戶端的請求,並且處理請求耗費較長的時間。這時就需要使用併發處理。多執行緒是一種方法,這裡使用Akka框架處理併發。(以下程式碼在Groovy1.7.5、akka-actors-1.2下執行成功)
這裡有三個角色:Client、Master、Worker
Client傻乎乎地發同步請求給Master,一直等到結果返回客戶端才離開。
Master接收客戶端發來的請求,然後將請求交給Worker處理,處理完成之後將結果返回給Client。
Worker負責具體的業務處理,它耗費的事件比較長。
所以這裡的關鍵在於Master,如果Master線性地“接收請求――呼叫Worker處理得到返回結果――將結果返回”,這樣的系統必將歇菜。
使用Akka可以方便地將它變成並行地。
先看看Client,模擬同時多個客戶端給Master發請求
import akka.actor.ActorRef
import static akka.actor.Actors.remote
class HelloClient implements Runnable {
int seq
String serviceName
HelloClient(int seq, String serviceName) {
this.seq = seq
this.serviceName = serviceName
}
void run() {
ActorRef actor = remote().actorFor(serviceName, "10.68.15.113", 9999);
String str = "Hello--" + seq
println "請求-----${str}"
Object res = actor.sendRequestReply(str)
println "返回-----${res}"
}
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(new HelloClient(i, "hello-service"))
thread.start() //同時啟動5個客戶端請求Master
}
}
}
真正幹活的Worker:
import akka.actor.UntypedActor
class HelloWorker extends UntypedActor { //Worker是一個Actor,需要實現onReceive方法
@Override
void onReceive(Object o) {
println "Worker 收到訊息----" + o
if (o instanceof String) {
String result = doWork(o) //呼叫真實的處理方法
getContext().replyUnsafe(result)//將結果返回給Master
}
}
//Worker處理其實很簡單,僅僅將引數字串改造一下而已。只不過使其sleep了20秒,讓它變得“耗時較長”
String doWork(String str) {
Thread.sleep(1000 * 20)
return "result----" + str + " 。"
}
}
負責併發排程的Master:
import akka.actor.ActorRef
import akka.actor.Actors
import akka.actor.UntypedActor
import akka.actor.UntypedActorFactory
import akka.dispatch.Future
import akka.dispatch.Futures
import java.util.concurrent.Callable
class HelloMaster extends UntypedActor {
@Override
void onReceive(Object o) {
println "Master接收到Work訊息:" + o
def clientChannel = getContext().channel() //客戶端連結Channel
//啟動worker actor
ActorRef worker = Actors.actorOf(new UntypedActorFactory() {
public UntypedActor create() {
return new HelloWorker();
}
}).start();
//這裡實現真正的併發
Future f1 = Futures.future(new Callable() {
Object call() {
def result = worker.sendRequestReply(o) //將訊息發給worker actor,讓Worker處理業務,同時得到返回結果
worker.stop()
println "Worker Return----" + result
clientChannel.sendOneWay(result) //將結果返回給客戶端
return result
}
})
println "Future call over"
}
public static void main(String[] args) { //啟動Master程序,繫結IP、埠和服務
Actors.remote().start("10.68.15.113", 9999).register(
"hello-service",
Actors.actorOf(HelloMaster.class));
}
}
看看客戶端的呼叫日誌
請求-----Hello--4
請求-----Hello--1
請求-----Hello--3
請求-----Hello--0
請求-----Hello--2
[GENERIC] [11-10-6 下午9:49] [RemoteClientConnected([email protected],/10.68.15.113:9999)]
[GENERIC] [11-10-6 下午9:49] [RemoteClientStarted([email protected],/10.68.15.113:9999)]
返回-----result----Hello--0 。
返回-----result----Hello--1 。
返回-----result----Hello--2 。
返回-----result----Hello--4 。
返回-----result----Hello--3 。
服務端的日誌:
[GENERIC] [11-10-6 下午9:49] [RemoteServerClientConnected([email protected],Some(/10.68.15.113:53462))]
Master接收到Work訊息:Hello--1
Future call over
Master接收到Work訊息:Hello--2
Future call over
Worker 收到訊息----Hello--1
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-13] [HelloWorker] started
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-13] [HelloWorker] started
Worker 收到訊息----Hello--2
Master接收到Work訊息:Hello--0
Future call over
Master接收到Work訊息:Hello--3
Worker 收到訊息----Hello--0
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-7] [HelloWorker] started
Future call over
Master接收到Work訊息:Hello--4
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-7] [HelloWorker] started
Worker 收到訊息----Hello--3
Future call over
Worker 收到訊息----Hello--4
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-7] [HelloWorker] started
Worker 將訊息Hello--1處理完成
Worker 將訊息Hello--2處理完成
Worker Return----result----Hello--2 。
Worker Return----result----Hello--1 。
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-13] [HelloWorker] stopping
Worker 將訊息Hello--0處理完成
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-3] [HelloWorker] stopping
Worker Return----result----Hello--0 。
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-23] [HelloWorker] stopping
Worker 將訊息Hello--4處理完成
Worker 將訊息Hello--3處理完成
Worker Return----result----Hello--4 。
Worker Return----result----Hello--3 。
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-11] [HelloWorker] stopping
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-10] [HelloWorker] stopping
可以從服務端日誌看到,Master接收到Work訊息後onReceive就結束了(函式最後列印Future call over),一連線收了5個訊息,然後Worker才收到訊息並處理。最後訊息處理完成好後f1的call才收到Worker Return的訊息。
這裡使用Future實現併發。
如果不使用Future:
def result = worker.sendRequestReply(o) //將訊息發給worker actor
println "Worker Return----" + result
getContext().replyUnsafe(result) // 將worker返回的訊息回覆給客戶端
這就成了同步處理(第一個訊息處理完後才接收並處理第二個訊息)。
如果在Future後呼叫了f1.await()或f1.get(),也成同步的了,因為await將等待worker返回後再繼續往下執行。
Future f1 = Futures.future(new Callable() {
Object call() {
def result = worker.sendRequestReply(o) //將訊息發給worker actor
worker.stop()
println "Worker Return----" + result
clientChannel.sendOneWay(result)
return result
}
})
println "Future call over" + f1.get()
伺服器日誌如下:
[GENERIC] [11-10-6 下午10:06] [RemoteServerStarted([email protected])]
[DEBUG] [11-10-6 下午10:06] [main] [HelloMaster] started
[GENERIC] [11-10-6 下午10:07] [RemoteServerClientConnected([email protected],Some(/10.68.15.113:53571))]
Master接收到Work訊息:Hello--0
[DEBUG] [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 收到訊息----Hello--0
Worker 將訊息Hello--0處理完成
[DEBUG] [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-5] [HelloWorker] stopping
Worker Return----result----Hello--0 。
Future call overresult----Hello--0 。
Master接收到Work訊息:Hello--2
Worker 收到訊息----Hello--2
[DEBUG] [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 將訊息Hello--2處理完成
Worker Return----result----Hello--2 。
Future call overresult----Hello--2 。
Master接收到Work訊息:Hello--3
[DEBUG] [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-10] [HelloWorker] stopping
[DEBUG] [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 收到訊息----Hello--3
Worker 將訊息Hello--3處理完成
Worker Return----result----Hello--3 。
Future call overresult----Hello--3 。
Master接收到Work訊息:Hello--4
[DEBUG] [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-14] [HelloWorker] stopping
[DEBUG] [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 收到訊息----Hello--4
Worker 將訊息Hello--4處理完成
Worker Return----result----Hello--4 。
Future call overresult----Hello--4 。
Master接收到Work訊息:Hello--1
[DEBUG] [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-18] [HelloWorker] stopping
[DEBUG] [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 收到訊息----Hello--1
Worker 將訊息Hello--1處理完成
Worker Return----result----Hello--1 。
Future call overresult----Hello--1 。
[DEBUG] [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-21] [HelloWorker] stopping
Master接收到Work訊息:Hello--6
[DEBUG] [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-24] [HelloWorker] started
Worker 收到訊息----Hello--6
Worker 將訊息Hello--6處理完成
Worker Return----result----Hello--6 。
Future call overresult----Hello--6 。
Master接收到Work訊息:Hello--5
[DEBUG] [11-10-6 下午10:09] [akka:event-driven:dispatcher:global-26] [HelloWorker] stopping
Worker 收到訊息----Hello--5
[DEBUG] [11-10-6 下午10:09] [akka:event-driven:dispatcher:global-24] [HelloWorker] started
需要注意的是,Akka預設使用環境變數%AKKA_HOME%/config/akka.conf配置,預設配置是client的read-timeout = 10(客戶端連線10秒後將自動斷開,這時服務端再給客戶端發訊息就釋出了了。報RemoteServerWriteFailed異常),可以將值設為0,將一直連著不斷開。
actor的timeout預設為5秒,也太短了,延長(不能設為0,0為總是超時).
7.參考文件
2)akka簡介 http://blog.chinaunix.net/uid-25885064-id-3400549.html
4)actors模型 http://janeky.iteye.com/blog/1504125
5)示例 http://www.th7.cn/Program/java/2012/03/29/67015.shtml
6)akka資料 http://www.jdon.com/tags/10531