執行緒排程(Dispatcher)
在前面的章節中,我們已經瞭解到Actor的基本知識點,例如建立方式、生命週期、訊息通訊、監控、容錯。關於actor通訊,大家都很熟悉,通過tell和ask就可以實現,但是,現在我想跟大家談論一下actor的訊息是如何進行排程的。
Dispatcher
在Akka中,為了保證訊息處理的及時性和執行緒的使用效率,Dispatcher對執行緒池做了一些協調工作。簡單來說,dispatcher就相當於一個訊息控制中心,負責對訊息進行分發和派送。由此可見,合理運用Dispatcher可以提升系統的吞吐量。
在實際專案中,我們可能會遇到許多不同情況的任務,但大致可以分為兩種:1.耗時長或長時間阻塞。2.耗時短,不會造成阻塞。針對不同的情況,在Actor系統中,我們可以選擇不同的dispatcher進行處理,從而做到執行緒資源隔離。
Executor
在具體講解Dispatcher使用方式之前,我們先了解一下它的核心內容-執行緒池(Executor),它提供了執行非同步任務的策略,分為兩種:
執行緒池 |
描述 |
thread-pool-executor |
基於普通的執行緒池,它有一個工作佇列(儲存任務),當執行緒空閒時會從佇列中獲取任務並執行。 |
fork-join-executor |
基於工作竊取的執行緒池(採用分而治之原理),它把大的任務拆分成小的任務然後並行執行,最後合併並結果,當某執行緒的任務佇列沒有任務時,會主動從其它執行緒的佇列中獲取任務。fork-join效能更佳,Akka預設選項。 |
Akka檔案配置
這裡先插入一個小知識點,我們先簡單學習一下Akka的檔案配置和載入。在預設情況下,程式會載入classpath下application.conf、application.json、application.properties檔案。這裡使用application.properties示例:
akka.log-config-on-start=on
這裡log-config-on-start配置成on,表示啟動時(即呼叫ActorSystem.create()方法)會列印配置資訊。
這是單檔案的配置方式,還可以進行多檔案配置。在大型專案中,配置檔案可能需要拆分成多個檔案,在Akka中,我們可以include的方式引入其它配置檔案。現在我們有配置檔案application.properties,system.properties檔案,例如:
system.properties
include "application"
akka.loglevel=debug
這裡system.properties引入了application配置檔案,並增加了loglevel配置。在建立ActorSystem時,我們可以顯示載入配置檔案,例如:
ActorSystem system = ActorSystem.create("system", ConfigFactory.load("system"));
Dispatcher配置
在Actor系統中,ActorSystem會採用一個使用fork-join-executor的dispatcher預設配置。當然,如果我們想要自定義配置,也不是很複雜。下面,定義一個config-fork-join-dispatcher,如下:
config-fork-join-dispatcher{
type=Dispatcher
executor="fork-join-executor"
fork-join-executor{
parallelism-min=3
parallelism-factor=3.0
parallelism-max=15
}
throughput=1
}
關於上述所使用到的引數,詳細資訊如下:
配置項 |
描述 |
config-fork-join-dispatcher |
自定義dispatcher名稱,Actor通過該名稱配置dispatcher。 |
type |
型別,除了dispatcher,還有PinnedDispatcher,CallingThreadDispatcher。 |
executor |
選擇非同步執行任務策略,也就是使用何種型別的執行緒池。 |
parallelism-min |
最小併發執行緒數。 |
parallelism-factor |
併發因子,用於計算最大執行緒數,最大執行緒數=處理器個數*併發因子。在這裡併發因子為3,假如處理器個數為4,最大執行緒數則是12。 |
parallelism-max |
最大執行緒數,但是在實際執行過程中,最大執行緒數=min(parallelism-max,處理器個數*併發因子),也就是最小值的那個。 |
throughput |
表明某個執行緒在放回執行緒池之前所能處理的訊息數,這裡配置為1,表明儘可能公平的分配訊息 |
thread-pool-executor的配置與fork-join-executor極其相似:
config-thread-pool-dispatcher{
type=Dispatcher
executor="thread-pool-executor"
thread-pool-executor{
#最小執行緒併發數
core-pool-size-min=3
#併發因子
core-pool-size-factor=3.0
#最大執行緒併發數
core-pool-size-max=15
}
throughput=1
}
關於thread-pool和fork-join的配置,到此為止。上面關於dispatcher的配置,我們放到system.conf檔案裡,現在我們建立一個Actor,使用一下它們:
public class ConfigActor extends AbstractActor {
public static void main(String[] args) {
//載入system配置檔案
ActorSystem system = ActorSystem.create("system", ConfigFactory.load("system"));
ActorRef configActor = system.actorOf(Props.create(ConfigActor.class).withDispatcher("config-fork-join-dispatcher"), "configActor");
configActor.tell("hello", ActorRef.noSender());
}
@Override
public Receive createReceive() {
return receiveBuilder().matchAny(other -> {
System.out.println( other + " " + Thread.currentThread().getName());
}).build();
}
}
執行結果:
hello system-config-fork-join-dispatcher-5
從結果中,我們可以看出配置的Dispatcher已經生效,這裡採用withDispatcher("config-fork-join-dispatcher")。使用dispatcher還有一種簡單的方式,那就是基於配置檔案,例如:
akka.actor.deployment{
/configActor{
dispatcher=config-fork-join-dispatcher
}
}
PinnedDispatcher
PinnedDispatcher是另外一種Dispatcher型別,它會為每個Actor提供只有一個執行緒的執行緒池,該執行緒池Actor獨有。使用方式如下:
PinnedDispatcher的配置:
config-pinned-dispatcher{
executor="thread-pool-executor"
type=PinnedDispatcher
}
在這裡,我們不需要再新增諸如上述的併發因子、最小執行緒數等引數,因為它會為每個Actor建立獨有的執行緒池。
我們修改上面的示例,來模擬一下耗時的操作:
修改createReceive()方法:
public Receive createReceive() {
return receiveBuilder().matchAny(other -> {
System.out.println( other + " " + Thread.currentThread().getName());
//沉睡4s,模擬耗時的操作
Thread.sleep(4000);
}).build();
}
然後建立大量的Actor,並給這些Actor傳送訊息:
for(int i=0;i<50;i++){
ActorRef configActor = system.actorOf(Props.create(ConfigActor.class).withDispatcher("config-pinned-dispatcher"), "configActor"+i);
configActor.tell("hello", ActorRef.noSender());
}
執行示例,大家應該可以發現,即使我們這裡模擬了耗時的操作,但是訊息傳送並沒有延遲,證明了PinnedDispatcher會給每個Actor建立一個獨立的執行緒池,Actor之間並不會相互影響。雖然PinnedDispatcher可以提升系統的吞吐量,但是大量的建立獨有執行緒池可能會耗盡伺服器的資源,所以大家在使用PinnedDispatcher之前,先對該耗時操作的併發量做好評估。
總結
上述,我們講解了Actor訊息之間是如何進行分發的,在系統中,當我們沒有對Dispatcher進行配置時,系統也會採用預設的Dispatcher和效能最佳的fork-join-executor進行執行緒排程。當我們想提高Actor系統的吞吐量時,不妨根據系統情況對執行緒排程做最好的配置。