1. 程式人生 > >執行緒排程(Dispatcher)

執行緒排程(Dispatcher)

       在前面的章節中,我們已經瞭解到Actor的基本知識點,例如建立方式、生命週期、訊息通訊、監控、容錯。關於actor通訊,大家都很熟悉,通過tell和ask就可以實現,但是,現在我想跟大家談論一下actor的訊息是如何進行排程的。

     Dispatcher

       在Akka中,為了保證訊息處理的及時性和執行緒的使用效率,Dispatcher對執行緒池做了一些協調工作。簡單來說,dispatcher就相當於一個訊息控制中心,負責對訊息進行分發和派送。由此可見,合理運用Dispatcher可以提升系統的吞吐量。

       在實際專案中,我們可能會遇到許多不同情況的任務,但大致可以分為兩種:1.耗時長或長時間阻塞。2.耗時短,不會造成阻塞。針對不同的情況,在Actor系統中,我們可以選擇不同的dispatcher進行處理,從而做到執行緒資源隔離。

     Executor

       在具體講解Dispatcher使用方式之前,我們先了解一下它的核心內容-執行緒池(Executor),它提供了執行非同步任務的策略,分為兩種:

執行緒池

描述

thread-pool-executor

基於普通的執行緒池,它有一個工作佇列(儲存任務),當執行緒空閒時會從佇列中獲取任務並執行。

fork-join-executor

基於工作竊取的執行緒池(採用分而治之原理),它把大的任務拆分成小的任務然後並行執行,最後合併並結果,當某執行緒的任務佇列沒有任務時,會主動從其它執行緒的佇列中獲取任務。fork-join效能更佳,Akka預設選項。

     Akka檔案配置

       這裡先插入一個小知識點,我們先簡單學習一下Akka的檔案配置和載入。在預設情況下,程式會載入classpath下application.conf、application.json、application.properties檔案。這裡使用application.properties示例:

akka.log-config-on-start=on

       這裡log-config-on-start配置成on,表示啟動時(即呼叫ActorSystem.create()方法)會列印配置資訊。

       這是單檔案的配置方式,還可以進行多檔案配置。在大型專案中,配置檔案可能需要拆分成多個檔案,在Akka中,我們可以include的方式引入其它配置檔案。現在我們有配置檔案application.properties,system.properties檔案,例如:

       system.properties    

include "application"

akka.loglevel=debug

       這裡system.properties引入了application配置檔案,並增加了loglevel配置。在建立ActorSystem時,我們可以顯示載入配置檔案,例如:

 ActorSystem system = ActorSystem.create("system", ConfigFactory.load("system"));

     Dispatcher配置

       在Actor系統中,ActorSystem會採用一個使用fork-join-executor的dispatcher預設配置。當然,如果我們想要自定義配置,也不是很複雜。下面,定義一個config-fork-join-dispatcher,如下:

config-fork-join-dispatcher{
    type=Dispatcher
    executor="fork-join-executor"
    fork-join-executor{
        parallelism-min=3
        parallelism-factor=3.0
        parallelism-max=15
    }
    throughput=1
}

     關於上述所使用到的引數,詳細資訊如下:

配置項

描述

config-fork-join-dispatcher

自定義dispatcher名稱,Actor通過該名稱配置dispatcher。

type

型別,除了dispatcher,還有PinnedDispatcher,CallingThreadDispatcher。

executor

選擇非同步執行任務策略,也就是使用何種型別的執行緒池。

parallelism-min

最小併發執行緒數。

parallelism-factor

併發因子,用於計算最大執行緒數,最大執行緒數=處理器個數*併發因子。在這裡併發因子為3,假如處理器個數為4,最大執行緒數則是12。

parallelism-max

最大執行緒數,但是在實際執行過程中,最大執行緒數=min(parallelism-max,處理器個數*併發因子),也就是最小值的那個。

throughput

表明某個執行緒在放回執行緒池之前所能處理的訊息數,這裡配置為1,表明儘可能公平的分配訊息

     thread-pool-executor的配置與fork-join-executor極其相似:

config-thread-pool-dispatcher{
    type=Dispatcher
    executor="thread-pool-executor"
    thread-pool-executor{
        #最小執行緒併發數
        core-pool-size-min=3
        #併發因子
        core-pool-size-factor=3.0
        #最大執行緒併發數
        core-pool-size-max=15
    }
    throughput=1
}

       關於thread-pool和fork-join的配置,到此為止。上面關於dispatcher的配置,我們放到system.conf檔案裡,現在我們建立一個Actor,使用一下它們:

public class ConfigActor extends AbstractActor {
    public static void main(String[] args) {
        //載入system配置檔案
        ActorSystem system = ActorSystem.create("system", ConfigFactory.load("system"));
        ActorRef configActor = system.actorOf(Props.create(ConfigActor.class).withDispatcher("config-fork-join-dispatcher"), "configActor");
        configActor.tell("hello", ActorRef.noSender());

    }

    @Override
    public Receive createReceive() {
        return receiveBuilder().matchAny(other -> {
            System.out.println( other + " " + Thread.currentThread().getName());
        }).build();
    }
}

      執行結果:

hello system-config-fork-join-dispatcher-5

      從結果中,我們可以看出配置的Dispatcher已經生效,這裡採用withDispatcher("config-fork-join-dispatcher")。使用dispatcher還有一種簡單的方式,那就是基於配置檔案,例如:

akka.actor.deployment{
   /configActor{
        dispatcher=config-fork-join-dispatcher
   }
}

     PinnedDispatcher

        PinnedDispatcher是另外一種Dispatcher型別,它會為每個Actor提供只有一個執行緒的執行緒池,該執行緒池Actor獨有。使用方式如下:

        PinnedDispatcher的配置:

config-pinned-dispatcher{

     executor="thread-pool-executor"

     type=PinnedDispatcher

}

       在這裡,我們不需要再新增諸如上述的併發因子、最小執行緒數等引數,因為它會為每個Actor建立獨有的執行緒池。

       我們修改上面的示例,來模擬一下耗時的操作:

       修改createReceive()方法:

 public Receive createReceive() {
        return receiveBuilder().matchAny(other -> {
            System.out.println( other + " " + Thread.currentThread().getName());
            //沉睡4s,模擬耗時的操作
            Thread.sleep(4000);
        }).build();
    }

       然後建立大量的Actor,並給這些Actor傳送訊息:

for(int i=0;i<50;i++){
            ActorRef configActor = system.actorOf(Props.create(ConfigActor.class).withDispatcher("config-pinned-dispatcher"), "configActor"+i);
            configActor.tell("hello", ActorRef.noSender());
        }

       執行示例,大家應該可以發現,即使我們這裡模擬了耗時的操作,但是訊息傳送並沒有延遲,證明了PinnedDispatcher會給每個Actor建立一個獨立的執行緒池,Actor之間並不會相互影響。雖然PinnedDispatcher可以提升系統的吞吐量,但是大量的建立獨有執行緒池可能會耗盡伺服器的資源,所以大家在使用PinnedDispatcher之前,先對該耗時操作的併發量做好評估。

     總結      

       上述,我們講解了Actor訊息之間是如何進行分發的,在系統中,當我們沒有對Dispatcher進行配置時,系統也會採用預設的Dispatcher和效能最佳的fork-join-executor進行執行緒排程。當我們想提高Actor系統的吞吐量時,不妨根據系統情況對執行緒排程做最好的配置。