Akka-Cluster(1)- Cluster Singleton 單例節點
關於cluster-singleton我在前面的博文已經介紹過,在這篇我想回顧一下它的作用和使用方法。首先,cluster-singleton就是叢集某個節點上的一個actor。任何時間在叢集內保證只會有一個這種actor的例項。它可以是在任何節點上,具體位置由akka-cluster系統的leader節點根據一定規則選定。當cluster-singleton所處的節點停止運作時leader會選擇另一個節點,然後系統會將cluster-singleton遷移到新的節點上來保證叢集中一定有一個活著的cluster-singleton例項,不過值得注意的是遷移的actor會丟失它的內部狀態。在程式設計實踐中常常會需要保證一項程式功能只能由唯一的actor來執行的情況,比如我們需要保證某種運算的順序,這時在叢集環境裡就可以使用cluster-singleton了。下面是cluster-singleton可能的一些使用場景:
1、在叢集中的一個單點運算決策角色,或者是叢集各節點互動運算的協調角色
2、叢集與外界軟體唯一的介面點
3、單一主角,多個從屬
4、中央命名機制,或者中央路由邏輯
cluster-singleton的工作原理是:在叢集的所有節點上都部署一個能產生、啟動某singleton型別的ClusterSingletonManager,這樣可以保證singleton可以遷移到任何節點。叢集中的leader節點動態決定singleton的具體生存節點並指示該節點上的ClusterSingletonManager建立singleton例項。其它actor與singleton的互動是通過這個singleton型別的ClusterSingletonProxy進行的,這是cluster系統提供的一項與singleton互動的渠道服務,在需要接觸singleton時可以建立ClusterSingletonProxy例項,然後把它當作目標傳送操作訊息,這樣就可以保證cluster-singleton的位置透明特性了。
下面我們就用個簡單的例子來示範cluster-singleton的使用看看它的唯一性和自動遷移特點:
構建一個簡單的actor:
class SingletonActor extends Actor with ActorLogging { import SingletonActor._ import akka.cluster._ override def receive: Receive = { case Greeting(msg) => log.info("*********got {} from {}********", msg, sender().path.address) case Relocate => log.info("*********I'll move from {}********", self.path.address) val cluster = Cluster(context.system) cluster.leave(cluster.selfAddress) case Die => log.info("*******I'm shutting down ... ********") self ! PoisonPill } }
把SingletonActor包嵌在ClusterSingletonManager裡:
bject SingletonActor {
trait SingletonMsg {}
case class Greeting(msg: String) extends SingletonMsg
case object Relocate extends SingletonMsg
case object Die extends SingletonMsg
def props = Props(new SingletonActor)
def createSingleton(port: Int) = {
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
.withFallback(ConfigFactory.parseString("akka.cluster.roles=[singleton]"))
.withFallback(ConfigFactory.load())
val singletonSystem = ActorSystem("ClusterSingletonSystem",config)
val singletonManager = singletonSystem.actorOf(ClusterSingletonManager.props(
singletonProps = props,
terminationMessage = Die,
settings = ClusterSingletonManagerSettings(singletonSystem)
.withRole(Some("singleton")) //只部署在角色是singleton的節點上
),
name = "singletonManager"
)
}
}
注意:singletonManager就是一個actor,所以是用actorOf(...)來構建的。現在這個singletonManager只能部署在singleton角色的節點上。
呼叫SingletonActor是通過ClusterSingletonProxy進行的:
object SingletonUser {
import SingletonActor._
def sendToSingleton(msg: SingletonMsg) = {
val config = ConfigFactory.parseString("akka.cluster.roles=[greeter]")
.withFallback(ConfigFactory.load())
val system = ActorSystem("ClusterSingletonSystem",config)
val singletonProxy = system.actorOf(ClusterSingletonProxy.props(
"user/singletonManager",
ClusterSingletonProxySettings(system).withRole(None)
))
singletonProxy ! msg
}
}
withRole(None)代表singletonProxy可以部署在任何節點上。下面是測試程式碼:
import SingletonActor._
import SingletonUser._
object SingletonDemo extends App {
createSingleton(2551) //seednode
createSingleton(2552)
createSingleton(2553)
createSingleton(2554)
scala.io.StdIn.readLine()
sendToSingleton(Greeting("hello from tiger"))
scala.io.StdIn.readLine()
sendToSingleton(Relocate)
scala.io.StdIn.readLine()
sendToSingleton(Greeting("hello from cat"))
scala.io.StdIn.readLine()
sendToSingleton(Die)
scala.io.StdIn.readLine()
}
檢驗一下輸出簡要:
[INFO] [10/25/2018 13:41:25.642] [ClusterSingletonSystem-akka.actor.default-dispatcher-3] [akka.tcp://[email protected]:51660/user/$a] Singleton identified at [akka.tcp://[email protected]:2551/user/singletonManager/singleton]
[INFO] [10/25/2018 13:41:25.654] [ClusterSingletonSystem-akka.actor.default-dispatcher-20] [akka.tcp://[email protected]:2551/user/singletonManager/singleton] *********got hello from tiger from akka.tcp://[email protected]:51660********
[INFO] [10/25/2018 13:43:10.839] [ClusterSingletonSystem-akka.actor.default-dispatcher-3] [akka.tcp://[email protected]:2551/user/singletonManager/singleton] *********I'll move from akka://ClusterSingletonSystem********
INFO] [10/25/2018 13:43:18.885] [ClusterSingletonSystem-akka.actor.default-dispatcher-2] [akka.tcp://[email protected]:51670/user/$a] Singleton identified at [akka.tcp://[email protected]:2552/user/singletonManager/singleton]
[INFO] [10/25/2018 13:43:18.886] [ClusterSingletonSystem-akka.actor.default-dispatcher-3] [akka.tcp://[email protected]:2552/user/singletonManager/singleton] *********got hello from cat from akka.tcp://[email protected]:51670********
[INFO] [10/25/2018 13:43:18.156] [ClusterSingletonSystem-akka.actor.default-dispatcher-16] [akka.tcp://[email protected]:2551/user/singletonManager/singleton] *******I'm shutting down ... ********
[INFO] [10/25/2018 13:43:18.177] [ClusterSingletonSystem-akka.remote.default-remote-dispatcher-18] [akka.tcp://[email protected]:2551/system/remoting-terminator] Shutting down remote daemon.
[INFO] [10/25/2018 13:43:18.178] [ClusterSingletonSystem-akka.remote.default-remote-dispatcher-18] [akka.tcp://[email protected]:2551/system/remoting-terminator] Remote daemon shut down; proceeding with flushing remote transports.
[INFO] [10/25/2018 13:43:18.215] [ClusterSingletonSystem-akka.remote.default-remote-dispatcher-14] [akka.tcp://[email protected]:2551/system/remoting-terminator] Remoting shut down.
可以看到:singleton從2551開始,移到2552, 然後shutdown。