1. 程式人生 > >Spark RPC介面和Scala Actor簡介

Spark RPC介面和Scala Actor簡介

Spark1.6之前中使用的分散式多執行緒框架,是Akka。Akka也實現了類似Scala Actor的模型。spark1.4標準化這套介面為了能夠實現基於其他技術的rpc方案,並且最終也是這麼做的,在spark1.6版本中rpc的預設實現由基於akka的actor轉變為基於netty,解決了實際專案中可能存在的akka版本問題。

Scala的Actor類似於Java中的多執行緒程式設計。但是不同的是,Scala的Actor提供的模型與多執行緒有所不同。Scala的Actor儘可能地避免鎖和共享狀態,從而避免多執行緒併發時出現資源爭用的情況,進而提升多執行緒程式設計的效能。此外,Scala Actor的這種模型還可以避免死鎖等一系列傳統多執行緒程式設計的問題。

一、Actor的建立、啟動和訊息收發

Scala提供了Actor trait來讓我們更方便地進行actor多執行緒程式設計,就Actor trait就類似於Java中的Thread和Runnable一樣,是基礎的多執行緒基類和介面。我們只要重寫Actor trait的act方法,即可實現自己的執行緒執行體,與Java中重寫run方法類似。
// 此外,使用start()方法啟動actor;使用!符號,向actor傳送訊息;actor內部使用receive和模式匹配接收訊息

// 案例:Actor Hello World
import scala.actors.Actor

class HelloActor extends Actor {
  def act() {
    while (true) {
      receive {
        case name: String => println("Hello, " + name)
      }
    }
  }
}

val helloActor = new HelloActor
helloActor.start()

helloActor ! "leo"

二、收發case class型別的訊息

// Scala的Actor模型與Java的多執行緒模型之間,很大的一個區別就是,Scala Actor天然支援執行緒之間的精準通訊;即一個actor可以給其他actor直接傳送訊息。這個功能是非常強大和方便的。
// 要給一個actor傳送訊息,需要使用“actor ! 訊息”的語法。在scala中,通常建議使用樣例類,即case class來作為訊息進行傳送。然後在actor接收訊息之後,可以使用scala強大的模式匹配功能來進行不同訊息的處理。
// 案例:使用者註冊登入後臺介面
case class Login(username: String, password: String)
case class Register(username: String, password: String)
class UserManageActor extends Actor {
  def act() {
    while (true) {
      receive {
        case Login(username, password) => println("login, username is " + username + ", password is " + password)
        case Register(username, password) => println("register, username is " + username + ", password is " + password)
      }
    }
  }
}
val userManageActor = new UserManageActor
userManageActor.start()
userManageActor ! Register("leo", "1234"); userManageActor ! Login("leo", "1234")

三、 Actor之間互相收發訊息

// 如果兩個Actor之間要互相收發訊息,那麼scala的建議是,一個actor向另外一個actor傳送訊息時,同時帶上自己的引用;其他actor收到自己的訊息時,直接通過傳送訊息的actor的引用,即可以給它回覆訊息。
// 案例:打電話
case class Message(content: String, sender: Actor)
class LeoTelephoneActor extends Actor {
  def act() {
    while (true) {
      receive {
        case Message(content, sender) => { println("leo telephone: " + content); sender ! "I'm leo, please call me after 10 minutes." }
      }
    }
  }
}
class JackTelephoneActor(val leoTelephoneActor: Actor) extends Actor {
  def act() {
    leoTelephoneActor ! Message("Hello, Leo, I'm Jack.", this)
    receive {
      case response: String => println("jack telephone: " + response)
    }
  }

}

四、RPC

RPC通訊主要有RpcEnv、RpcEndpoint、RpcEndpointRef這三個核心類。
RpcEndpoint是一個通訊端,例如Spark叢集中的Master,或Worker,都是一個RpcEndpoint。但是,如果想要與一個RpcEndpoint端進行通訊,一定需要獲取到該RpcEndpoint一個RpcEndpointRef,通過RpcEndpointRef與RpcEndpoint進行通訊,只能通過一個RpcEnv環境物件來獲取RpcEndpoint對應的RPCEndpointRef。

RpcEndpoint => Actor
RpcEndpointRef => ActorRef
RpcEnv => ActorSystem

RpcEndpoint

RpcEndpoint對應actor例子中的Actor,用於處理資訊,

其有兩個重要方法,receivereceiveAndReply,區別是後者處理完資訊後會返回資訊給傳送者,類似tcp和udp。再看actor例子中我們定義的HelloActor,其處理資訊的方法名正是receive,一個RpcEndpoint的生命週期如下: onStart -> receive(receiveAndReply)* -> onStop

RpcEnv

RpcEnv對應actor例子中的ActorSystem,註冊並維護RpcEndpoint和RpcEndpointRef

主要方法為setupEndpoint,用法上對應例子中ActorSystem的actorOf方法,用於註冊RpcEndpoint,內部使用Dispatcher維護註冊的RpcEndpoint,也提供了多種獲取RpcEndpointRef的方法,如asyncSetupEndpointRefByURI、setupEndpointRefByURI和setupEndpointRef,以及移除RpcEndpoint的方法stop,關閉RpcEnv的方法shutdown,其還維護了RpcEnvFileServer,用於上傳下載jar和file。最後,例項化RpcEnv時,需指定是server模式還是client(預設是server),server模式下底層啟動netty

RpcEndpointRef
RpcEndpointRef對應actor例子中的ActorRef,是對遠端RpcEndpoint的一個引用,向對應的RpcEndpoint傳送資訊,
主要方法sendask,send方法只發送資訊,ask方法傳送資訊的同時接受返回值,