1. 程式人生 > >用AKKA實現簡單的RPC通訊模型2

用AKKA實現簡單的RPC通訊模型2

上一篇文章,簡單地用AKKA實現了RPC通訊,這篇文章在之前的基礎之上,進行了更多地關於RPC通訊的操作,包括傳一些相關資料,包括心跳機制等

具體功能如下圖:


程式碼結構:


WorkerInfo.scala程式碼

package cn.heres.rpc

/**
  * Created by vinsuan on 2017/6/15 0015.
  */
class WorkerInfo(val id: String,val memory:Int,val cores:Int) {
  //todo 上一次心跳
  var lastHeartBeatTime : Long = _
}


RemoteMessage.scala程式碼

package cn.heres.rpc

/**
  * Created by vinsuan on 2017/6/15 0015.
  */
trait RemoteMessage extends Serializable

//worker->Master
case class RegisterWorker(id:String , memory : Int ,cores:Int) extends RemoteMessage
case class Heartbeat(id:String ) extends RemoteMessage

//Master->worker
case class RegisteredWorker(masterUrl: String) extends RemoteMessage
//worker->self
case object SendHeartbeat
//Master->self
case object CheckTimeOutWorker


Worker.scala程式碼:

package cn.heres.rpc

import java.util.UUID

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._

/**
  * Created by vinsuan on 2017/6/13 0013.
  */
class Worker(val masterHost : String ,val masterPost : Int,val memory: Int ,val cores :Int) extends Actor{

var master : ActorSelection = _
  val workerId = UUID.randomUUID().toString
  val HEARTBEAT_INTERVAL = 10000
//建立連線
  override def preStart(): Unit = {
     master = context.actorSelection(s"akka.tcp://
[email protected]
$masterHost:$masterPost/user/Master") master ! RegisterWorker(workerId,memory,cores) } override def receive: Receive = { case RegisteredWorker(masterUrl) => { println(masterUrl) //匯入隱式轉換 import context.dispatcher //啟動定時器傳送心跳 context.system.scheduler.schedule(0 millis,HEARTBEAT_INTERVAL millis,self,SendHeartbeat) } case SendHeartbeat => { println("send heartbeat to master") master ! Heartbeat(workerId) } } } object Worker{ def main(args: Array[String]) { val host = args(0) val port = args(1).toInt val masterHost = args(2) val masterPort = args(3).toInt val memory = args(4).toInt val cores = args(5).toInt //準備配置 val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val config = ConfigFactory.parseString(configStr) //ActorSystem 老大,輔助建立和監控下面的Actor ,它是單例的 val actorSystem = ActorSystem("workerSystem",config) //建立Actor val worker = actorSystem.actorOf(Props(new Worker(masterHost,masterPort,memory,cores)),"Worker") actorSystem.awaitTermination() } }


Maser.scala程式碼:

package cn.heres.rpc

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._

/**
  * Created by vinsuan on 2017/6/13 0013.
  */
class Master(val host:String,val port:Int) extends  Actor {
  //workerId -> workerInfo
  val idtoWorker = new scala.collection.mutable.HashMap[String, WorkerInfo]()
  //workerInfo
  val workers = new scala.collection.mutable.HashSet[WorkerInfo]()
  val CHECK_INTERVAL = 15000

  //超時檢測的間隔
  override def preStart(): Unit = {
    println("preStart invoked")
    //匯入隱式轉換
    import context.dispatcher
    context.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker)
  }

  //用於接收訊息
  override def receive: Receive = {
    case RegisterWorker(id, memory, cores) => {
      //判斷一下,是不是已經註冊過
      if (!idtoWorker.contains(id)) {
        //把Worker的資訊封裝起來儲存到記憶體中
        val workerInfo = new WorkerInfo(id, memory, cores)
        idtoWorker(id) = workerInfo
        workers += workerInfo
        sender ! RegisteredWorker(s"akka.tcp://[email protected]$host:$port/user/Master")
      }
    }
    case Heartbeat(id) => {
      if (idtoWorker.contains(id)) {
        val workerInfo = idtoWorker(id)
        //報活
        val currentTime = System.currentTimeMillis()
        workerInfo.lastHeartBeatTime = currentTime

      }
    }
    case CheckTimeOutWorker => {
      val currentTime = System.currentTimeMillis()
      val toRemove = workers.filter(x => currentTime - x.lastHeartBeatTime > CHECK_INTERVAL)
      for (w <- toRemove) {
        workers -= w
        idtoWorker -= w.id
      }
      println("還有幾個活著的worker:" + workers.size)
    }
  }
}


object Master {
  def main(args: Array[String]) {

    val host = args(0)
    val port = args(1).toInt
    //準備配置
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    //ActorSystem 老大,輔助建立和監控下面的Actor ,它是單例的
    val actorSystem = ActorSystem("MasterSystem", config)
    //建立Actor
    val master = actorSystem.actorOf(Props(new Master(host, port)), "Master")
    actorSystem.awaitTermination()
  }
}


相關推薦

AKKA實現簡單RPC通訊模型2

上一篇文章,簡單地用AKKA實現了RPC通訊,這篇文章在之前的基礎之上,進行了更多地關於RPC通訊的操作,包括傳一些相關資料,包括心跳機制等 具體功能如下圖: 程式碼結構: WorkerInfo.scala程式碼 package cn.heres.rpc /**

Scala學習筆記(10)—— Akka 實現簡單 RPC 框架

1 Akka 介紹 目前大多數的分散式架構底層通訊都是通過RPC實現的,RPC框架非常多,比如前我們學過的Hadoop專案的RPC通訊框架,但是Hadoop在設計之初就是為了執行長達數小時的批量而設計的,在某些極端的情況下,任務提交的延遲很高,所有Hadoop的

linux網路程式設計之socket實現簡單客戶端和服務端的通訊(基於TCP)

一、介紹基於TCP協議通過socket實現網路程式設計常用API 1、讀者如果不是很熟悉,可以先看我之前寫的幾篇部落格,有socket,地址結構的理解,更加方便讀者理解 地址分別是: 2、socket(TCP)程式設計API簡介 1)、socket int s

linux網路程式設計之socket實現簡單客戶端和服務端的通訊(基於UDP)

1、sendto和recvfrom函式介紹 sendto(經socket傳送資料) 相關函式 send , sendmsg,recv , recvfrom , socket 表頭檔案 #include < sys/types.h >#includ

分散式Web應用----基於Socket+動態代理實現簡單RPC 生產者消費者模型

寫在前面 前面一文主要簡單介紹了JAVA動態代理基本原理,這也是實現RPC的基本知識,這裡我們運用Socket簡單實現一個遠端過程呼叫,方便大家理解RPC的基本原理,希望對大家有所幫助。 新建People介面類與Man實現類 介面類 pu

java實現簡單快速的webservice客戶端/資料採集器(支援soap1.1和soap1.2標準,支援utf-8編碼)

前言: 用了cxf,axis等各種wbeservice實現庫,簡單試用了一下動態呼叫的方式,很不滿意,完全無法滿足業務的需要,所以自己實現了一個webservice採集客戶端,方便動態呼叫外部webservice介面。 一、實現的功能 1、soap1.1客戶端(soap1.

BlockingQueue實現簡單的生產者-消費者模型

package com.example.test; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurren

jQuery實現簡單的DOM操作

追加 匹配 簡單的 rep 兄弟節點 子元素 spa 添加元素 新建 通過jQuery創建元素節點:$oLi = $("<li></li>");這樣我們就創建了一個li標簽 如果想在元素節點中添加文本的話也挺簡單:$oLi = $("<li&g

jq實現簡單輪播

cti 標簽 function radi har mil ott ava 懸停 一個簡單的實例: css代碼: <style type="text/css">@charset "utf-8";*{ padding:0px; border:

javascript實現簡單戶登錄驗證

-c turn get https 沒有 驗證 utf-8 .com head 1 <!DOCTYPE html> 2 <html lang="en"> 3 <head> 4 <meta charset="

【SSH學習筆記】Struts2實現簡單戶登錄

utf-8 png rds href -a his ets 屬性 url 準備階段 在使用學習Struts2的時候首先要下載相應的架包 Struts2資源下載 這裏建議下載第一個,在struts-2.5.14.1-all.zip裏有很多實用的東西,不僅有架包還有官方為開發

神級程序員教你Python實現簡單的導彈自動追蹤!此乃裝逼神技!

大致 範圍 發現 完美 容易 game 分析 iss 兩個 由於待會要用pygame演示,他的坐標系是y軸向下,所以這裏我們也用y向下的坐標系。 計算sina和cosa,正弦對比斜,余弦鄰比斜,斜邊可以利用兩點距離公式計算出,即: 於是 AC的長度就是導彈的速度乘

基於select類型多路IO復實現簡單socket並發

清理 就是 ive class sockets true 簡單 Coding conn 還有很多缺限,如客戶斷開無限重復 以下轉至老師博客: server: #!/usr/bin/env python # -*- coding: utf-8 -*- __author__

BootNettyRpc:采 Netty 實現RPC 框架

ofo 文件配置 RR 實現 端口 監控 ble tin cto 什麽是 BootNettyRpc?BootNettyRpc 是一個采用Netty實現的Rpc框架,適用於Spring Boot項目,支持Spring Cloud。 目前支持的版本為Spring Boot 1.

Python實現簡單的名片管理系統

rem 代碼 card 刪除 import val 字典 fin pytho 首先新建項目,包含主程序cards_main和工具庫cards_tools: 如下:main函數主主要使用while和if實現4功能切換,切換的具體操作由cards_tools中的函數執行。不說了

linux網絡編程之socket實現簡單客戶端和服務端的通信(基於UDP)

服務端 msg ets lin fgets err n) stderr tcp 單客戶端和服務端的通信(基於UDP) 代碼 服務端代碼socket3.c #include<sys/types.h> #include<sys/socket.h>

JavaScript 實現簡單拼圖遊戲

本篇主要講解,如何利用原生的 JavaScript 來實現一個簡單的拼圖小遊戲。 線上體驗地址:拼圖 一、遊戲的基礎邏輯 想用一門語言來開發遊戲,必須先了解如何使用這門語言來實現一些基礎邏輯,比如影象的繪製、互動的處理、定時器等。 1、圖形繪製 圖形繪製是一切的基礎,這裡使用 Ja

SpringCloud(一) springboot實現簡單服務呼叫

分享一下我老師大神的人工智慧教程吧。零基礎,通俗易懂!風趣幽默!http://www.captainbed.net/ 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

JavaScript實現簡單的驗證碼

                                用JavaScript實現簡單的驗證碼 先展示一下最終的效果圖

Tensorflow 實現簡單線性迴歸模型

    Tensorflow是深度學習常用的一個框架,從目前官方文件看,Tensorflow支援CNN、RNN和LSTM演算法,這都是目前在Image,Speech和NLP領域最流行的深度神經網路模型。     為了熟悉和理解tensor