用AKKA實現簡單的RPC通訊模型2
上一篇文章,簡單地用AKKA實現了RPC通訊,這篇文章在之前的基礎之上,進行了更多地關於RPC通訊的操作,包括傳一些相關資料,包括心跳機制等
具體功能如下圖:
程式碼結構:
WorkerInfo.scala程式碼
package cn.heres.rpc
/**
* Created by vinsuan on 2017/6/15 0015.
*/
class WorkerInfo(val id: String,val memory:Int,val cores:Int) {
//todo 上一次心跳
var lastHeartBeatTime : Long = _
}
RemoteMessage.scala程式碼
package cn.heres.rpc /** * Created by vinsuan on 2017/6/15 0015. */ trait RemoteMessage extends Serializable //worker->Master case class RegisterWorker(id:String , memory : Int ,cores:Int) extends RemoteMessage case class Heartbeat(id:String ) extends RemoteMessage //Master->worker case class RegisteredWorker(masterUrl: String) extends RemoteMessage //worker->self case object SendHeartbeat //Master->self case object CheckTimeOutWorker
Worker.scala程式碼:
package cn.heres.rpc
import java.util.UUID
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
/**
* Created by vinsuan on 2017/6/13 0013.
*/
class Worker(val masterHost : String ,val masterPost : Int,val memory: Int ,val cores :Int) extends Actor{
var master : ActorSelection = _
val workerId = UUID.randomUUID().toString
val HEARTBEAT_INTERVAL = 10000
//建立連線
override def preStart(): Unit = {
master = context.actorSelection(s"akka.tcp:// [email protected]$masterHost:$masterPost/user/Master")
master ! RegisterWorker(workerId,memory,cores)
}
override def receive: Receive = {
case RegisteredWorker(masterUrl) => {
println(masterUrl)
//匯入隱式轉換
import context.dispatcher
//啟動定時器傳送心跳
context.system.scheduler.schedule(0 millis,HEARTBEAT_INTERVAL millis,self,SendHeartbeat)
}
case SendHeartbeat => {
println("send heartbeat to master")
master ! Heartbeat(workerId)
}
}
}
object Worker{
def main(args: Array[String]) {
val host = args(0)
val port = args(1).toInt
val masterHost = args(2)
val masterPort = args(3).toInt
val memory = args(4).toInt
val cores = args(5).toInt
//準備配置
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val config = ConfigFactory.parseString(configStr)
//ActorSystem 老大,輔助建立和監控下面的Actor ,它是單例的
val actorSystem = ActorSystem("workerSystem",config)
//建立Actor
val worker = actorSystem.actorOf(Props(new Worker(masterHost,masterPort,memory,cores)),"Worker")
actorSystem.awaitTermination()
}
}
Maser.scala程式碼:
package cn.heres.rpc
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
/**
* Created by vinsuan on 2017/6/13 0013.
*/
class Master(val host:String,val port:Int) extends Actor {
//workerId -> workerInfo
val idtoWorker = new scala.collection.mutable.HashMap[String, WorkerInfo]()
//workerInfo
val workers = new scala.collection.mutable.HashSet[WorkerInfo]()
val CHECK_INTERVAL = 15000
//超時檢測的間隔
override def preStart(): Unit = {
println("preStart invoked")
//匯入隱式轉換
import context.dispatcher
context.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker)
}
//用於接收訊息
override def receive: Receive = {
case RegisterWorker(id, memory, cores) => {
//判斷一下,是不是已經註冊過
if (!idtoWorker.contains(id)) {
//把Worker的資訊封裝起來儲存到記憶體中
val workerInfo = new WorkerInfo(id, memory, cores)
idtoWorker(id) = workerInfo
workers += workerInfo
sender ! RegisteredWorker(s"akka.tcp://[email protected]$host:$port/user/Master")
}
}
case Heartbeat(id) => {
if (idtoWorker.contains(id)) {
val workerInfo = idtoWorker(id)
//報活
val currentTime = System.currentTimeMillis()
workerInfo.lastHeartBeatTime = currentTime
}
}
case CheckTimeOutWorker => {
val currentTime = System.currentTimeMillis()
val toRemove = workers.filter(x => currentTime - x.lastHeartBeatTime > CHECK_INTERVAL)
for (w <- toRemove) {
workers -= w
idtoWorker -= w.id
}
println("還有幾個活著的worker:" + workers.size)
}
}
}
object Master {
def main(args: Array[String]) {
val host = args(0)
val port = args(1).toInt
//準備配置
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val config = ConfigFactory.parseString(configStr)
//ActorSystem 老大,輔助建立和監控下面的Actor ,它是單例的
val actorSystem = ActorSystem("MasterSystem", config)
//建立Actor
val master = actorSystem.actorOf(Props(new Master(host, port)), "Master")
actorSystem.awaitTermination()
}
}
相關推薦
用AKKA實現簡單的RPC通訊模型2
上一篇文章,簡單地用AKKA實現了RPC通訊,這篇文章在之前的基礎之上,進行了更多地關於RPC通訊的操作,包括傳一些相關資料,包括心跳機制等 具體功能如下圖: 程式碼結構: WorkerInfo.scala程式碼 package cn.heres.rpc /**
Scala學習筆記(10)—— Akka 實現簡單 RPC 框架
1 Akka 介紹 目前大多數的分散式架構底層通訊都是通過RPC實現的,RPC框架非常多,比如前我們學過的Hadoop專案的RPC通訊框架,但是Hadoop在設計之初就是為了執行長達數小時的批量而設計的,在某些極端的情況下,任務提交的延遲很高,所有Hadoop的
linux網路程式設計之用socket實現簡單客戶端和服務端的通訊(基於TCP)
一、介紹基於TCP協議通過socket實現網路程式設計常用API 1、讀者如果不是很熟悉,可以先看我之前寫的幾篇部落格,有socket,地址結構的理解,更加方便讀者理解 地址分別是: 2、socket(TCP)程式設計API簡介 1)、socket int s
linux網路程式設計之用socket實現簡單客戶端和服務端的通訊(基於UDP)
1、sendto和recvfrom函式介紹 sendto(經socket傳送資料) 相關函式 send , sendmsg,recv , recvfrom , socket 表頭檔案 #include < sys/types.h >#includ
分散式Web應用----基於Socket+動態代理實現簡單RPC 生產者消費者模型
寫在前面 前面一文主要簡單介紹了JAVA動態代理基本原理,這也是實現RPC的基本知識,這裡我們運用Socket簡單實現一個遠端過程呼叫,方便大家理解RPC的基本原理,希望對大家有所幫助。 新建People介面類與Man實現類 介面類 pu
用java實現簡單快速的webservice客戶端/資料採集器(支援soap1.1和soap1.2標準,支援utf-8編碼)
前言: 用了cxf,axis等各種wbeservice實現庫,簡單試用了一下動態呼叫的方式,很不滿意,完全無法滿足業務的需要,所以自己實現了一個webservice採集客戶端,方便動態呼叫外部webservice介面。 一、實現的功能 1、soap1.1客戶端(soap1.
用BlockingQueue實現簡單的生產者-消費者模型
package com.example.test; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurren
用jQuery實現簡單的DOM操作
追加 匹配 簡單的 rep 兄弟節點 子元素 spa 添加元素 新建 通過jQuery創建元素節點:$oLi = $("<li></li>");這樣我們就創建了一個li標簽 如果想在元素節點中添加文本的話也挺簡單:$oLi = $("<li&g
用jq實現簡單輪播
cti 標簽 function radi har mil ott ava 懸停 一個簡單的實例: css代碼: <style type="text/css">@charset "utf-8";*{ padding:0px; border:
用javascript實現簡單的用戶登錄驗證
-c turn get https 沒有 驗證 utf-8 .com head 1 <!DOCTYPE html> 2 <html lang="en"> 3 <head> 4 <meta charset="
【SSH學習筆記】用Struts2實現簡單的用戶登錄
utf-8 png rds href -a his ets 屬性 url 準備階段 在使用學習Struts2的時候首先要下載相應的架包 Struts2資源下載 這裏建議下載第一個,在struts-2.5.14.1-all.zip裏有很多實用的東西,不僅有架包還有官方為開發
神級程序員教你用Python實現簡單的導彈自動追蹤!此乃裝逼神技!
大致 範圍 發現 完美 容易 game 分析 iss 兩個 由於待會要用pygame演示,他的坐標系是y軸向下,所以這裏我們也用y向下的坐標系。 計算sina和cosa,正弦對比斜,余弦鄰比斜,斜邊可以利用兩點距離公式計算出,即: 於是 AC的長度就是導彈的速度乘
基於select類型多路IO復用,實現簡單socket並發
清理 就是 ive class sockets true 簡單 Coding conn 還有很多缺限,如客戶斷開無限重復 以下轉至老師博客: server: #!/usr/bin/env python # -*- coding: utf-8 -*- __author__
BootNettyRpc:采用 Netty 實現的 RPC 框架
ofo 文件配置 RR 實現 端口 監控 ble tin cto 什麽是 BootNettyRpc?BootNettyRpc 是一個采用Netty實現的Rpc框架,適用於Spring Boot項目,支持Spring Cloud。 目前支持的版本為Spring Boot 1.
用Python實現簡單的名片管理系統
rem 代碼 card 刪除 import val 字典 fin pytho 首先新建項目,包含主程序cards_main和工具庫cards_tools: 如下:main函數主主要使用while和if實現4功能切換,切換的具體操作由cards_tools中的函數執行。不說了
linux網絡編程之用socket實現簡單客戶端和服務端的通信(基於UDP)
服務端 msg ets lin fgets err n) stderr tcp 單客戶端和服務端的通信(基於UDP) 代碼 服務端代碼socket3.c #include<sys/types.h> #include<sys/socket.h>
用 JavaScript 實現簡單拼圖遊戲
本篇主要講解,如何利用原生的 JavaScript 來實現一個簡單的拼圖小遊戲。 線上體驗地址:拼圖 一、遊戲的基礎邏輯 想用一門語言來開發遊戲,必須先了解如何使用這門語言來實現一些基礎邏輯,比如影象的繪製、互動的處理、定時器等。 1、圖形繪製 圖形繪製是一切的基礎,這裡使用 Ja
SpringCloud(一) 用springboot實現簡單服務呼叫
分享一下我老師大神的人工智慧教程吧。零基礎,通俗易懂!風趣幽默!http://www.captainbed.net/ 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!
用JavaScript實現簡單的驗證碼
用JavaScript實現簡單的驗證碼 先展示一下最終的效果圖
Tensorflow 實現簡單線性迴歸模型
Tensorflow是深度學習常用的一個框架,從目前官方文件看,Tensorflow支援CNN、RNN和LSTM演算法,這都是目前在Image,Speech和NLP領域最流行的深度神經網路模型。 為了熟悉和理解tensor