1. 程式人生 > >akka學習教程(十四) akka分散式實戰

akka學習教程(十四) akka分散式實戰

akka系列文章目錄

上一篇文章介紹了akka叢集的搭建,現在假如服務的生產者與消費者兩個角色,模擬真實的服務呼叫。
本篇文章主要參考 使用Akka構建叢集(二)

整體架構

服務端三個服務,埠為2552,2553,2551;客戶端有兩個:2554,2555
服務端角色為[server];客戶端角色為[client]

服務端

叢集角色

首先配置服務端叢集角色為[server]:

akka {
  loglevel = "INFO"

  actor {
      provider = "akka.cluster.ClusterActorRefProvider"
} remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 2551 } } cluster { seed-nodes = [ "akka.tcp://[email protected]:2551", "akka.tcp://[email protected]:2552"] #//#snippet # excluded from snippet
auto-down-unreachable-after = 10s #//#snippet # auto downing is NOT safe for production deployments. # you may want to use it during development, read more about it in the docs. # auto-down-unreachable-after = 10s roles = [server] # Disable legacy metrics in akka-cluster.
metrics.enabled=off } } # 持久化相關 akka.persistence.journal.plugin = "akka.persistence.journal.inmem" # Absolute path to the default snapshot store plugin configuration entry. akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

定義通訊的資料結構

實際開發過程中,可以將該資料結構作為單獨的介面供服務端和客戶端引用。

package akka.myCluster;

import java.io.Serializable;

public class TransformationMessages {

    /**
     * 傳遞的資料(引數)
     */
    public static class TransformationJob implements Serializable {
        private final String text;  

        public TransformationJob(String text) {  
            this.text = text;  
        }  

        public String getText() {  
            return text;  
        }  
    }

    /**
     * 返回結果
     */
    public static class TransformationResult implements Serializable {  
        private final String text;  

        public TransformationResult(String text) {  
            this.text = text;  
        }  

        public String getText() {  
            return text;  
        }  

        @Override  
        public String toString() {  
            return "TransformationResult(" + text + ")";  
        }  
    }

    /**
     * 異常處理
     */
    public static class JobFailed implements Serializable {  
        private final String reason;  
        private final TransformationJob job;  

        public JobFailed(String reason, TransformationJob job) {  
            this.reason = reason;  
            this.job = job;  
        }  

        public String getReason() {  
            return reason;  
        }  

        public TransformationJob getJob() {  
            return job;  
        }  

        @Override  
        public String toString() {  
            return "JobFailed(" + reason + ")";  
        }  
    }  

    /**
     * 用於服務端向客戶端註冊
     */
    public static final int BACKEND_REGISTRATION = 1;

}  

真正服務端業務邏輯處理程式碼:簡單講收到的字串轉為大寫返回

package akka.myCluster;

import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import com.typesafe.config.ConfigFactory;

import static akka.myCluster.TransformationMessages.BACKEND_REGISTRATION;

public class MyAkkaClusterServer extends UntypedActor {

    LoggingAdapter logger = Logging.getLogger(getContext().system(), this);

    Cluster cluster = Cluster.get(getContext().system());

    // subscribe to cluster changes  
    @Override  
    public void preStart() {  
        // #subscribe  
        cluster.subscribe(getSelf(), ClusterEvent.MemberUp.class);
        // #subscribe  
    }  

    // re-subscribe when restart  
    @Override  
    public void postStop() {  
        cluster.unsubscribe(getSelf());  
    }  

    @Override  
    public void onReceive(Object message) {
        if (message instanceof TransformationMessages.TransformationJob) {
            TransformationMessages.TransformationJob job = (TransformationMessages.TransformationJob) message;
            logger.info(job.getText());
            getSender().tell(new TransformationMessages.TransformationResult(job.getText().toUpperCase()), getSelf());

        } else if (message instanceof ClusterEvent.CurrentClusterState) {
            /**
             * 當前節點在剛剛加入叢集時,會收到CurrentClusterState訊息,從中可以解析出叢集中的所有前端節點(即roles為frontend的),並向其傳送BACKEND_REGISTRATION訊息,用於註冊自己
             */
            ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState) message;
            for (Member member : state.getMembers()) {
                if (member.status().equals(MemberStatus.up())) {
                    register(member);
                }
            }

        } else if (message instanceof ClusterEvent.MemberUp) {
            /**
             * 有新的節點加入
             */
            ClusterEvent.MemberUp mUp = (ClusterEvent.MemberUp) message;
            register(mUp.member());

        } else {
            unhandled(message);
        }

    }

    /**
     * 如果是客戶端角色,則像客戶端註冊自己的資訊。客戶端收到訊息以後會講這個服務端存到本機服務列表中
     * @param member
     */
    void register(Member member) {
        if (member.hasRole("client"))
            getContext().actorSelection(member.address() + "/user/myAkkaClusterClient").tell(BACKEND_REGISTRATION, getSelf());
    }

    public static void main(String [] args){
        System.out.println("Start MyAkkaClusterServer");
        ActorSystem system = ActorSystem.create("akkaClusterTest", ConfigFactory.load("reference.conf"));
        system.actorOf(Props.create(MyAkkaClusterServer.class), "myAkkaClusterServer");
        System.out.println("Started MyAkkaClusterServer");

    }
}

如何保證服務發現與維護

上面程式碼已經有註釋了,有2點:
- 有新節點加入時,如果是客戶端角色,則像客戶端註冊自己的資訊。客戶端收到訊息以後會講這個服務端存到本機服務列表中
- 服務端當前節點在剛剛加入叢集時,會收到CurrentClusterState訊息,從中可以解析出叢集中的所有前端節點(即roles為frontend的),並向其傳送BACKEND_REGISTRATION訊息,用於註冊自己

客戶端

修改客戶端roles為client

akka {
  loglevel = "INFO"

  actor {
      provider = "akka.cluster.ClusterActorRefProvider"
    }
    remote {
      log-remote-lifecycle-events = off
      netty.tcp {
        hostname = "127.0.0.1"
        port = 2554
      }
    }

    cluster {
      seed-nodes = [
        "akka.tcp://[email protected]:2551",
        "akka.tcp://[email protected]:2552"]

      #//#snippet
      # excluded from snippet
      auto-down-unreachable-after = 10s
      #//#snippet
      # auto downing is NOT safe for production deployments.
      # you may want to use it during development, read more about it in the docs.
      #
      auto-down-unreachable-after = 10s

    roles = [client]
      # Disable legacy metrics in akka-cluster.
      metrics.enabled=off
    }
}

# 持久化相關
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
# Absolute path to the default snapshot store plugin configuration entry.
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

客戶端程式碼

package akka.myCluster;

import akka.actor.*;
import akka.dispatch.OnSuccess;
import akka.util.Timeout;
import com.typesafe.config.ConfigFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static akka.myCluster.TransformationMessages.BACKEND_REGISTRATION;
import static akka.pattern.Patterns.ask;

public class MyAkkaClusterClient extends UntypedActor {

    List<ActorRef> backends = new ArrayList<ActorRef>();
    int jobCounter = 0;

    @Override
    public void onReceive(Object message) {
        if ((message instanceof TransformationMessages.TransformationJob) && backends.isEmpty()) {//無服務提供者
            TransformationMessages.TransformationJob job = (TransformationMessages.TransformationJob) message;
            getSender().tell(
                    new TransformationMessages.JobFailed("Service unavailable, try again later", job),
                    getSender());

        } else if (message instanceof TransformationMessages.TransformationJob) {
            TransformationMessages.TransformationJob job = (TransformationMessages.TransformationJob) message;
            /**
             * 這裡在客戶端業務程式碼裡進行負載均衡操作。實際業務中可以提供多種負載均衡策略,並且也可以做分流限流等各種控制。
             */
            jobCounter++;
            backends.get(jobCounter % backends.size())
                    .forward(job, getContext());

        } else if (message == BACKEND_REGISTRATION) {
            /**
             * 註冊服務提供者
             */
            getContext().watch(getSender());//這裡對服務提供者進行watch
            backends.add(getSender());

        } else if (message instanceof Terminated) {
            /**
             * 移除服務提供者
             */
            Terminated terminated = (Terminated) message;
            backends.remove(terminated.getActor());

        } else {
            unhandled(message);
        }
    }

    public static void main(String [] args){
        System.out.println("Start myAkkaClusterClient");
        ActorSystem actorSystem = ActorSystem.create("akkaClusterTest", ConfigFactory.load("reference.conf"));
        final ActorRef myAkkaClusterClient = actorSystem.actorOf(Props.create(MyAkkaClusterClient.class), "myAkkaClusterClient");
        System.out.println("Started myAkkaClusterClient");

        final FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS);
        final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS));
        final ExecutionContext ec = actorSystem.dispatcher();
        final AtomicInteger counter = new AtomicInteger();

        actorSystem.scheduler().schedule(interval, interval, new Runnable() {
            public void run() {
                ask(myAkkaClusterClient, new TransformationMessages.TransformationJob("hello-" + counter.incrementAndGet()), timeout)
                        .onSuccess(new OnSuccess<Object>() {
                            public void onSuccess(Object result) {
                                System.out.println(result.toString());
                            }
                        }, ec);
            }
        }, ec);

    }
}

可以看到TransformationFrontend處理的訊息分為以下三種:

  • BACKEND_REGISTRATION:收到此訊息說明有服務端通知客戶端,TransformationFrontend首先將服務端的ActorRef加入backends列表,然後對服務端的ActorRef新增監管;
  • Terminated:由於TransformationFrontend對服務端的ActorRef添加了監管,所以當服務端程序奔潰或者重啟時,將收到Terminated訊息,此時TransformationFrontend將此服務端的ActorRef從backends列表中移除;
  • TransformationJob:此訊息說明有新的轉換任務需要TransformationFrontend處理,處理分兩種情況:
    • backends列表為空,則向傳送此任務的傳送者返回JobFailed訊息,並告知“目前沒有服務端可用,請稍後再試”;
    • backends列表不為空,則通過取模運算選出一個服務端,將TransformationJob轉發給服務端進一步處理;

執行結果

啟動3個服務端,2個客戶端

服務端2551輸出:

[INFO] [01/18/2017 17:01:57.167] [akkaClusterTest-akka.actor.default-dispatcher-20] [akka://akkaClusterTest/user/myAkkaClusterServer] hello-4
[INFO] [01/18/2017 17:02:02.438] [akkaClusterTest-akka.actor.default-dispatcher-20] [akka://akkaClusterTest/user/myAkkaClusterServer] hello.2555-4
[INFO] [01/18/2017 17:02:03.124] [akkaClusterTest-akka.actor.default-dispatcher-3] [akka://akkaClusterTest/user/myAkkaClusterServer] hello-7
[INFO] [01/18/2017 17:02:08.416] [akkaClusterTest-akka.actor.default-dispatcher-20] [akka://akkaClusterTest/user/myAkkaClusterServer] hello.2555-7
[INFO] [01/18/2017 17:02:09.137] [akkaClusterTest-akka.actor.default-dispatcher-18] [akka://akkaClusterTest/user/myAkkaClusterServer] hello-10
[INFO] [01/18/2017 17:02:14.414] [akkaClusterTest-akka.actor.default-dispatcher-17] [akka://akkaClusterTest/user/myAkkaClusterServer] hello.2555-10
[WARN] [01/18/2017 17:02:15.104] [akkaClusterTest-akka.remote.default-remote-dispatcher-6] 
[INFO] [01/18/2017 17:02:15.204] [akkaClusterTest-akka.actor.default-dispatcher-17] [akka://akkaClusterTest/user/myAkkaClusterServer] hello-13

服務端2552輸出:

[INFO] [01/18/2017 17:01:53.178] [akkaClusterTest-akka.actor.default-dispatcher-5] [akka://akkaClusterTest/user/myAkkaClusterServer] hello-2
[INFO] [01/18/2017 17:01:59.125] [akkaClusterTest-akka.actor.default-dispatcher-22] [akka://akkaClusterTest/user/myAkkaClusterServer] hello-5
[INFO] [01/18/2017 17:02:00.433] [akkaClusterTest-akka.actor.default-dispatcher-17] [akka://akkaClusterTest/user/myAkkaClusterServer] hello.2555-3
[INFO] [01/18/2017 17:02:05.126] [akkaClusterTest-akka.actor.default-dispatcher-6] [akka://akkaClusterTest/user/myAkkaClusterServer] hello-8
[INFO] [01/18/2017 17:02:06.427] [akkaClusterTest-akka.actor.default-dispatcher-17] [akka://akkaClusterTest/user/myAkkaClusterServer] hello.2555-6
[INFO] [01/18/2017 17:02:11.130] [akkaClusterTest-akka.actor.default-dispatcher-5] [akka://akkaClusterTest/user/myAkkaClusterServer] hello-11
[INFO] [01/18/2017 17:02:12.420] [akkaClusterTest-akka.actor.default-dispatcher-17] [akka://akkaClusterTest/user/myAkkaClusterServer] hello.2555-9
[WARN] [01/18/2017 17:02:15.053] [akkaClusterTest-akka.remote.default-remote-dispatcher-7] 
[INFO] [01/18/2017 17:02:17.123] [akkaClusterTest-akka.actor.default-dispatcher-16] [akka://akkaClusterTest/user/myAkkaClusterServer] hello-14

服務端2553輸出:

[INFO] [01/18/2017 17:01:55.144] [akkaClusterTest-akka.actor.default-dispatcher-20] [akka://akkaClusterTest/user/myAkkaClusterServer] hello-3
[INFO] [01/18/2017 17:01:58.428] [akkaClusterTest-akka.actor.default-dispatcher-4] [akka://akkaClusterTest/user/myAkkaClusterServer] hello.2555-2
[INFO] [01/18/2017 17:02:01.130] [akkaClusterTest-akka.actor.default-dispatcher-16] [akka://akkaClusterTest/user/myAkkaClusterServer] hello-6
[INFO] [01/18/2017 17:02:04.413] [akkaClusterTest-akka.actor.default-dispatcher-16] [akka://akkaClusterTest/user/myAkkaClusterServer] hello.2555-5
[INFO] [01/18/2017 17:02:07.141] [akkaClusterTest-akka.actor.default-dispatcher-5] [akka://akkaClusterTest/user/myAkkaClusterServer] hello-9
[INFO] [01/18/2017 17:02:10.413] [akkaClusterTest-akka.actor.default-dispatcher-18] [akka://akkaClusterTest/user/myAkkaClusterServer] hello.2555-8
[INFO] [01/18/2017 17:02:13.128] [akkaClusterTest-akka.actor.default-dispatcher-18] [akka://akkaClusterTest/user/myAkkaClusterServer] hello-12

客戶端2554輸出

JobFailed(Service unavailable, try again later)
TransformationResult(HELLO-2)
TransformationResult(HELLO-3)
TransformationResult(HELLO-4)
TransformationResult(HELLO-5)
TransformationResult(HELLO-6)
TransformationResult(HELLO-7)
TransformationResult(HELLO-8)
TransformationResult(HELLO-9)
TransformationResult(HELLO-10)
TransformationResult(HELLO-11)
TransformationResult(HELLO-12)

客戶端2555輸出

JobFailed(Service unavailable, try again later)
TransformationResult(HELLO.2555-2)
TransformationResult(HELLO.2555-3)
TransformationResult(HELLO.2555-4)
TransformationResult(HELLO.2555-5)
TransformationResult(HELLO.2555-6)
TransformationResult(HELLO.2555-7)
TransformationResult(HELLO.2555-8)
TransformationResult(HELLO.2555-9)
TransformationResult(HELLO.2555-10)

可以發現,客戶端傳送的訊息被服務端正確消費,並且進行了負載均衡。不過上面第一條訊息由於客戶端節點剛開始處理訊息時,backends列表裡還沒有快取好任何backend的ActorRef,所以報錯JobFailed(Service unavailable, try again later)

總結

與thrift一樣,使用akka需要自己進行服務的發現治理工作。但是著名的spark都完全依賴akka,所以我們在工作中是可以使用akka的。當然akka的一些概念比較困難,學習路線比較長,所以想要學會也需要一些時日,必須經過深入學習實戰才可。

參考資料

  • 書籍《java高併發程式設計》

相關推薦

akka學習教程() akka分散式實戰

akka系列文章目錄 上一篇文章介紹了akka叢集的搭建,現在假如服務的生產者與消費者兩個角色,模擬真實的服務呼叫。 本篇文章主要參考 使用Akka構建叢集(二) 整體架構 服務端三個服務,埠為2552,2553,2551;客戶端有兩個:

akka學習教程(二) Spring與Akka的整合

akka系列文章目錄 概述 近年來隨著Spark的火熱,Spark本身使用的開發語言Scala、用到的分散式記憶體檔案系統Tachyon(現已更名為Alluxio)以及基於Actor併發程式設計模型的Akka都引起了大家的注意。瞭解過Akk

akka學習教程(一)簡介

akka系列文章目錄 為什麼要用akka Akka提供可擴充套件的實時事務處理。 Akka是一個執行時與程式設計模型一致的系統,為以下目標設計: 垂直擴充套件(併發) 水平擴充套件(遠端呼叫) 高容錯 在Akka的世界裡,只有一個內容需要學

Scala學習天 Scala中作為介面的trait、在物件中混入trait程式碼實戰

1、java中有interface介面,scala裡有同樣功能的關鍵字trait,trait的功能比interface功能強大很多。 2、trait和Java中的有所不同 interface中只能定義abstarct public方法 而trait中可以定義具

Tensorflow實戰學習(三)【實現Word2Vec】

卷積神經網路發展趨勢。Perceptron(感知機),1957年,Frank Resenblatt提出,始祖。Neocognitron(神經認知機),多層級神經網路,日本科學家Kunihiko fukushima,20世紀80年代提出,一定程度視覺認知功能,啟發

akka學習教程(二)HelloWord

akka系列文章目錄 注意:新版本的akka需要使用jdk8 裡面有兩個Actor: package sample.hello; import akka.actor.

akka學習教程(六) 路由器Router

akka系列文章目錄 通常在分散式任務排程系統中會有這樣的需求:一組actor提供相同的服務,我們在呼叫任務的時候只需要選擇其中一個actor進行處理即可。 其實這就是一個負載均衡或者說路由策略,akka作為一個高效能支援併發的actor模型,可以用來作

CSS學習)-CSS顏色之中的一個

rac 顏色 opacity bsp pac pre alpha color data- 一、理論: 1.RGB色彩模式 a.CMYK色彩模式 b.索引色彩模式 (主要用於web) c.灰度模式 d.雙色調模式 2.opacity: a.alphavalue:透明

python學習節(正則)

image all flags 正則 asdf alt afa images lag python2和python3都有兩種字符串類型strbytes re模塊find一類的函數都是精確查找。字符串是模糊匹配 findall(pattern,string,flags) r

kvm虛擬化學習筆記()之kvm虛擬機靜態遷移

虛擬主機 kvm 虛擬機遷移 kvm虛擬化 這裏提到的靜態遷移同是基於KVM虛擬主機之間的遷移,非異構虛擬化平臺的靜態遷移。1.靜態遷移就是虛擬機在關機狀態下,拷貝虛擬機虛擬磁盤文件與配置文件到目標虛擬主機中,實現的遷移。(1)虛擬主機各自使用本地存儲存放虛擬機磁盤文件本文實現基於本地磁盤存儲

linux系統學習天-<<工程師技術>>

linux工程師技術 linux管理員技術 linux雲計算 深圳雲計算王森 雲計算運維工程師 RAID磁盤陣列 ? 廉價冗余磁盤陣列 – Redundant Arrays of Inexpensive Disks – 通過硬件/軟件技術,將多個較小/低速的磁盤整合成一 個大磁盤 –

JMeter學習)jmeter_斷言使用

cnblogs amp one 理解 等於 ring 查點 sample 希望 先說一下使用斷言的目的:在request的返回層面增加一層判斷機制。因為request成功了,並不代表結果一定正確。類似於QTP中的檢查點檢查點。斷言的使用方法:step_1:在你選擇的Samp

odoo10學習筆記:mixin其他功能模塊

idg 其他 www 有用 read http 消息系統 pan div 原文地址:http://www.cnblogs.com/ygj0930/p/7153680.html odoo提供了許多有用的功能,比如:討論、通知、網站等。我們可以在開發自己的模塊時,引入這些功能。

學習

linux學習六周第一次課(3月12日)9.1 正則介紹_grep上9.2 grep中9.3 grep下擴展把一個目錄下,過濾所有.php文檔中含有eval的行grep -r --include=".php" ‘eval‘ /data/正則介紹_grep上grep上-r 遍歷一遍所有子目錄

學習

linux學習11.1 LAMP架構介紹11.2 MySQL、MariaDB介紹11.3/11.4/11.5 MySQL安裝擴展mysql5.5源碼編譯安裝 http://www.aminglinux.com/bbs/thread-1059-1-1.htmlmysql5.7二進制包安裝(變化較大) http:

學習

linux學習十四周四次課(5月14日)16.1 Tomcat介紹16.2 安裝jdk16.3 安裝Tomcat 擴展java容器比較 http://my.oschina.net/diedai/blog/271367 http://www.360doc.com/content/11/0618/21/16915

學習

linux學習5月30日任務20.5 shell腳本中的邏輯判斷20.6 文件目錄屬性判斷20.7 if特殊用法20.8/20.9 case判斷 shell腳本中的邏輯判斷 格式1:if 條件 ; then 語句; fi格式2:if 條件; then 語句; else 語句; fi格式3:if …; then

Java學習筆記:如何定義Java中的類以及使用對象的屬性

lte 類的屬性 一個 pri text 新的 oid ali tail 如何定義Java中的類以及使用對象的屬性 一:類的重要性; 所有Java程序都以類class為組織單元 二:什麽是類; 類是模子,確定對象將會擁有的特征(屬性)和行為(方法); 三:類的

C++語言學習)——C++類成員函數調用分析

不可訪問 ring error: 兩種 cout list 空間 splay 示例代碼 C++語言學習(十四)——C++類成員函數調用分析 一、C++成員函數 1、C++成員函數的編譯 C++中的函數在編譯時會根據命名空間、類、參數簽名等信息進行重新命名,形成新的函數名。函

IDA Pro 權威指南學習筆記() - 操縱函數

禁用 not 當前 函數 reg 代碼區 字節 strong 掃描 IDA 無法定位一個函數調用,由於沒有直接的方法到達函數,IDA 將無法識別它們 IDA 可能無法正確確定函數的結束部分,需要手動幹預,以更正反匯編代碼中的錯誤 如果編譯器已經將函數分割到幾個地址範圍,