1. 程式人生 > >Spark RPC實現原理分析

Spark RPC實現原理分析

Spark RPC模組架構圖

Spark RPC是按照MailBox的設計思路來實現的,為了能夠更直觀地表達RPC的設計,我們先從RPC架構圖來看,如下圖所示:

Spark RPC體系結構圖

該圖主要描述了從客戶端向伺服器端傳送遠端訊息在伺服器端傳送本地訊息的過程,該過程中顯示了訊息的流動及格式轉換,還有各個元件之間的分工協作。

元件介紹

核心元件

Spark RPC通訊主要有RpcEnvRpcEndpointRpcEndpointRef這三個核心類。

  1. RpcEndpoint

    該類定義了RPC通訊過程中的伺服器端物件,除了具有管理一組RpcEndpoint生命週期的操作(constructor -> onStart -> receive* -> onStop)

    ,並給出了通訊過程中RpcEndpoint所具有的基於事件驅動的行為(連線、斷開、網路異常),實際上對於Spark框架來說主要是接收訊息並處理

    private[spark] trait RpcEndpoint {
    
       /**
         * 當前RpcEndpoint所註冊的[[RpcEnv]]
         */
       val rpcEnv: RpcEnv
    
       /**
         * 當前[[RpcEndpoint]]的代理,當`onStart`方法被呼叫時`self`生效,當`onStop`被呼叫時,`self`變成null。
         * 注意:在`onStart`方法被呼叫之前,[[RpcEndpoint]]物件還未進行註冊,所以就沒有有效的[[RpcEndpointRef]]。
         */
    final def self: RpcEndpointRef = { require(rpcEnv != null, "rpcEnv has not been initialized") rpcEnv.endpointRef(this) } /** * 用於處理從`RpcEndpointRef.send` 或 `RpcCallContext.reply`接收到的訊息。 * 如果接收到一個不匹配的訊息,將會丟擲SparkException異常,併發送給`onError` * * 通過上面的receive方法,接收由RpcEndpointRef.send方法傳送的訊息, * 該類訊息不需要進行響應訊息(Reply),而只是在RpcEndpoint端進行處理。 */
    def receive: PartialFunction[Any, Unit] = { case _ => throw new SparkException(self + " does not implement 'receive'") } /** * 處理來自`RpcEndpointRef.ask`的訊息,RpcEndpoint端處理完訊息後,需要給呼叫RpcEndpointRef.ask的通訊端返回響應訊息。 */ def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case _ => context.sendFailure(new SparkException(self + " won't reply anything")) } /** * 在處理訊息期間出現異常的話將被呼叫 */ def onError(cause: Throwable): Unit = { // By default, throw e and let RpcEnv handle it throw cause } /** * 當有遠端連線到當前伺服器時會被呼叫 */ def onConnected(remoteAddress: RpcAddress): Unit = { // By default, do nothing. } /** * 當遠端與當前伺服器斷開時,該方法會被呼叫 */ def onDisconnected(remoteAddress: RpcAddress): Unit = { // By default, do nothing. } /** * 當前節點與遠端之間的連線發生錯誤時,該方法將會被呼叫 */ def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = { // By default, do nothing. } /** * 在 [[RpcEndpoint]] 開始處理訊息之前被呼叫 */ def onStart(): Unit = { // By default, do nothing. } /** * 當[[RpcEndpoint]]正在停止時,該方法將會被呼叫。 * `self`將會在該方法中被置位null,因此你不能使用它來發送訊息。 */ def onStop(): Unit = { // By default, do nothing. } /** * A convenient method to stop [[RpcEndpoint]]. */ final def stop(): Unit = { val _self = self if (_self != null) { rpcEnv.stop(_self) } } }
  2. RpcEndpointRef

    RpcEndpointRef是一個對RpcEndpoint的遠端引用物件,通過它可以向遠端的RpcEndpoint端傳送訊息以進行通訊。RpcEndpointRef特質的定義,程式碼如下所示:

    private[spark] abstract class RpcEndpointRef(conf: SparkConf) extends Serializable with Logging {
    
       private[this] val maxRetries = RpcUtils.numRetries(conf)
       private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
       private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
    
       /**
         * 返回[RpcEndpointRef]]的引用的遠端伺服器地址
         */
       def address: RpcAddress
    
       def name: String
    
       /**
         * 傳送一條單向的非同步訊息,並且傳送訊息後不等待響應,亦即Send-and-forget。
         */
       def send(message: Any): Unit
    
       /**
         * 傳送訊息給相關的[[RpcEndpoint.receiveAndReply]],並且返回一個 Future,能夠在timeout時間內接收回復。
         * 該方法只會傳送一次訊息,失敗後不重試。
         * 而ask方法傳送訊息後需要等待通訊對端給予響應,通過Future來非同步獲取響應結果。
         */
       def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
    
       /**
         * 傳送訊息給相關的[[RpcEndpoint.receiveAndReply]],並且返回一個 Future,能夠在defaultAskTimeout時間內接收回復。
         * 該方法只會傳送一次訊息,失敗後不重試。
         * 而ask方法傳送訊息後需要等待通訊對端給予響應,通過Future來非同步獲取響應結果。
         */
       def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
    
       /**
         * 傳送訊息給相關的[[RpcEndpoint.receiveAndReply)]],並且返回一個Future,能夠在defaultAskTimeout時間內接收回復,如果超時則丟擲異常。
         * 注意:該方法會阻塞當前執行緒,
         *
         * @param message the message to send
         * @tparam T type of the reply message
         * @return the reply message from the corresponding [[RpcEndpoint]]
         */
       def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)
    
       /**
         * 傳送訊息給相關的[[RpcEndpoint.receiveAndReply)]],並且返回一個Future,能夠在timeout時間內接收回復,如果超時則丟擲異常。
         * 注意:該方法會阻塞當前執行緒,
         *
         * @param 傳送的訊息內容
         * @param 超時時長
         * @tparam 響應訊息的型別
         * @return 從[[RpcEndpoint]]端響應的訊息內容
         */
       def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
           val future = ask[T](message, timeout)
           timeout.awaitResult(future)
       }
    }
    

    上面的send方法的實現在NettyRpcEndpointRef中,表示傳送訊息後不等待響應,亦即Send-and-forget,具體實現如下:

    override def send(message: Any): Unit = {
     require(message != null, "Message is null")
     nettyEnv.send(new RequestMessage(nettyEnv.address /*如果是遠端訊息,則為null*/ , endpointRef, message))
    }

    可見,它是通過NettyRpcEnv來發送RequestMessage訊息,並將當前NettyRpcEndpointRef封裝到RequestMessage訊息物件中傳送出去,通訊對端通過該NettyRpcEndpointRef能夠識別出訊息來源。

    而ask方法傳送訊息後需要等待通訊對端給予響應,通過Future來非同步獲取響應結果,也是在NettyRpcEndpointRef中實現,如下所示:

    override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
     nettyEnv.ask(new RequestMessage(nettyEnv.address /*如果是遠端訊息,則為null*/ , endpointRef, message), timeout)
    }

    類似的,也是通過NettyRpcEnv來發送一個RequestMessage訊息。

  3. RpcEnv

    一個RpcEnv是一個RPC環境物件,它負責管理RpcEndpoints的註冊,以及如何從一個RpcEndpoint獲取到一個RpcEndpointRef。RpcEndpoint是一個通訊端,例如Spark叢集中的Master,或Worker,都是一個RpcEndpoint。但是,如果想要與一個RpcEndpoint端進行通訊,一定需要獲取到該RpcEndpoint一個RpcEndpointRef,而獲取該RpcEndpointRef只能通過一個RpcEnv環境物件來獲取。

    private[spark] abstract class RpcEnv(conf: SparkConf) {
    
       private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)
    
       /**
         * 返回已經註冊的[[RpcEndpoint]]的RpcEndpointRef。
         * 該方法只用於[[RpcEndpoint.self]]方法實現中。
         * 如果終端相關的[[RpcEndpointRef]]不存在,則返回null。
         */
       private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
    
       /**
         * 如果是伺服器模式,則返回當前伺服器監聽的地址;否則為空
         */
       def address: RpcAddress
    
       /**
         * 使用一個name來註冊一個[[RpcEndpoint]],並且返回它的[[RpcEndpointRef]]物件。
         * [[RpcEnv]]並不保證執行緒安全性。
         */
       def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
    
       /**
         * 通過一個URI來非同步檢索[[RpcEndpointRef]]物件
         */
       def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]
    
       /**
         * 通過一個URI來同步檢索[[RpcEndpointRef]]物件
         */
       def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
           defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
       }
    
       /**
         * 根據`address` 和 `endpointName`對 [[RpcEndpointRef]]進行同步檢索。
         */
       def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = {
           setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString) // URI:
       }
    
       /**
         * 停止指定的[[RpcEndpoint]]物件。
         */
       def stop(endpoint: RpcEndpointRef): Unit
    
       /**
         * 非同步關閉當前的[[RpcEnv]]。
         * 如果需要確保成功地退出[[RpcEnv]],在執行[[shutdown()]]之後需要呼叫[[awaitTermination()]]。
         */
       def shutdown(): Unit
    
       /**
         * 等待直到[[RpcEnv]]退出。
         * TODO do we need a timeout parameter?
         */
       def awaitTermination(): Unit
    
       /**
         * 如果沒有[[RpcEnv]]物件,那麼[[RpcEndpointRef]]將不能被反序列化。
         * 因此,如果任何反序列化的物件中包含了[[RpcEndpointRef]],那麼這些反序列化的程式碼都應該在該方法中執行。
         */
       def deserialize[T](deserializationAction: () => T): T
    
       /**
         * 用於返回檔案伺服器的例項。
         * 如果RpcEnv不是以伺服器模式執行,那麼該項可能為null。
         *
         */
       def fileServer: RpcEnvFileServer
    
       /**
         * 開啟一個通道從給定的URI下載檔案。
         * 如果由RpcEnvFileServer返回的URI使用"spark"模式,那麼該方法將會被工具類呼叫來進行檔案檢索。
         *
         * @param uri URI with location of the file.
         */
       def openChannel(uri: String): ReadableByteChannel
    
    }

收件箱InBox

從上面的體系結構圖可知,InBox作用於伺服器端。它與RpcEndpoint是一對一的關係,每一個命名唯一的RpcEndpoint對應一個執行緒安全的InBox。所有傳送給一個RpcEndpoint的訊息,都由對應的InBox進行儲存。InBox提供一個process方法實現,該方法會在一個dispatcher-event-loop執行緒池中被呼叫,將InBox中的訊息提供給關聯的RpcEndpoint進行消費。

需要注意的是,如果通訊端端點的實現是繼承自ThreadSafeRpcEndpoint,則表明該Endpoint不允許併發處理訊息。如果繼承自RpcEndpoint,那麼就可以併發的呼叫該服務。在具體的process方法中,如果enableConcurrent為false,即只允許單執行緒處理。那麼執行process方法時,如果numActiveThreads大於0,說明已經至少有一個執行緒正在處理,則立即返回,取消本次處理操作。

Inbox.stop方法用於停止RpcEndpointInBox,具體實現如下:

def stop(): Unit = inbox.synchronized {
  // 該方法必須加鎖,這樣可以確保 "OnStop"是最後一條訊息。
  if (!stopped) {
    enableConcurrent = false
    stopped = true
    messages.add(OnStop)
  }
}

它的原理是,首先該方法必須加鎖inbox.synchronized,保證”OnStop”是訊息列表中的最後一條訊息。其次,為了保證能夠安全地關閉RpcEndpoint,必須將enableConcurrent置位false,這樣只有一個執行緒能夠處理OnStop訊息,並安全地執行RpcEndpoint.onStop方法並釋放資源。當stop被執行後,其後向該InBox相關聯的RpcEndpoint傳送的訊息將全部drop,並輸出相應的日誌記錄。

訊息轉發路由Dispatcher

  1. 提供RpcEndpoint註冊

    在註冊RpcEndpoint時,每個RpcEndpoint都需要有一個唯一的名稱。在RpcEnv.setupEndpoint(name: String, endpoint: RpcEndpoint)方法的實現中就是直接呼叫registerRpcEndpoint進行端點註冊的,並返回一個NettyRpcEndpointRef

    def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
     val addr = RpcEndpointAddress(nettyEnv.address, name)
     val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
     synchronized {
       if (stopped) {
         throw new IllegalStateException("RpcEnv has been stopped")
       }
    
       // 不能重複註冊,根據RpcEndpoint的名稱來檢查唯一性
       if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
         throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
       }
    
       val data = endpoints.get(name)
       endpointRefs.put(data.endpoint, data.ref)
       // 向receivers添加當前的Endpoint,receivers表示當前有未讀訊息的使用者列表,
       receivers.offer(data) // for the OnStart message
     }
     endpointRef
    }
    • EndpointData是一個簡單的JavaBean類,用於封裝RpcEndpoint相關的元件和屬性:

      /**
      * 一個JavaBean物件,用於封裝Endpoint物件、Endpoint物件唯一名稱、EndpointRef以及收件箱
      * 
      * @param name     Endpoint name
      * @param endpoint 伺服器端
      * @param ref      伺服器端引用
      */
      private class EndpointData(val name: String, val endpoint: RpcEndpoint, val ref: NettyRpcEndpointRef) {
      val inbox = new Inbox(ref, endpoint)
      }
    • endpoints

      該屬性用於維護RpcEndpoint的名稱與EndpointData之間的對映關係\

發件箱OutBox

從上面的體系結構圖可知,OnBox作用於客戶端。類似於收件箱InBox,它與TransportClient是一對一的關係,而一個TransportClient對應著一個遠端的RpcEndPoint。OutBox提供的主要功能如下:

  1. 使用send方法將訊息投遞到發件箱

    OutBox提供send方法,會將RpcEnv中所有轉發給某個RpcEndPoint的訊息都先放到一個messages連結串列中,最後就開始使用drainOutbox來清空發件箱。

    /**
     * 用於傳送訊息。
     * - 如果目前沒有可用的連線,則將訊息快取並建立一個連線。
     * - 如果[[Outbox]]已經停止,那麼sender將會丟擲一個[[SparkException]]
     */
    def send(message: OutboxMessage): Unit = {
     val dropped = synchronized {
       if (stopped) {
         true
       } else {
         messages.add(message)
         false
       }
     }
     if (dropped) {
       // 如果[[Outbox]]已經停止,那麼sender將會丟擲一個[[SparkException]]
       message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))
     } else {
       drainOutbox()
     }
    }
  2. 使用drainOutbox清空發件箱

    drainOutbox主要用於清空發件箱中的訊息,訊息會通過傳輸層TransportClient傳送給遠端伺服器。該方法會在開始處進行一系列的檢查,需要保證傳輸層的連線已經建立,如果沒有建立,則向nettyEnv.clientConnectionExecutor提交建立連線的任務後並返回,待連線任務完成後會再次呼叫drainOutox方法。另外,drainOutox會保證執行緒安全性,通過布林值draining可以保證同一時刻只會有一個執行緒能夠進行訊息的處理和傳送。

    具體的訊息傳送邏輯則交由OutboxMessage的實現來完成,OutboxMessage有兩個子類,OneWayOutboxMessageRpcOutboxMessage,分別對應呼叫RpcEndpointreceivereceiveAndReply方法。

    /**
     * sealed作用:
     * 1. 其修飾的trait,class只能在當前檔案裡面被繼承
     * 2. 用sealed修飾這樣做的目的是告訴scala編譯器在檢查模式匹配的時候,讓scala知道這些case的所有情況,
     *    scala就能夠在編譯的時候進行檢查,看你寫的程式碼是否有沒有漏掉什麼沒case到,減少程式設計的錯誤。
     */
    private[netty] sealed trait OutboxMessage {
    
       def sendWith(client: TransportClient): Unit
    
       def onFailure(e: Throwable): Unit
    
    }

    對於OneWayOutboxMessage,由於不需要返回值,則簡單地通過呼叫傳輸層client.send方法將訊息發出。

    private[netty] case class OneWayOutboxMessage(content: ByteBuffer) extends OutboxMessage with Logging {
    
       override def sendWith(client: TransportClient): Unit = {
           client.send(content)
       }
    
       override def onFailure(e: Throwable): Unit = {
           e match {
               case e1: RpcEnvStoppedException => logWarning(e1.getMessage)
               case e1: Throwable => logWarning(s"Failed to send one-way RPC.", e1)
           }
       }
    
    }

    對於RpcOutboxMessage,由於需要伺服器的響應結果,因此需要實現傳輸層提供的RpcResponseCallback介面,並提供onFailureonSuccess的方法實現。在實際的傳送訊息時會使用client.sendRpc方法,將訊息內容和RpcResponseCallback物件傳遞給傳輸層,該方法會立即返回一個requestId

    而傳輸層底層會有獨立的執行緒負責將訊息序列化並且傳送出去,每個Message都會返回一個UUID,由底層來維護一個傳送出去訊息與其CallbackHashMap

    • 如果請求超時,會通過requestId在傳輸層中移除該RPC請求,從而達到取消訊息傳送的效果;
    • 如果請求的訊息成功返回,則會使用RpcResponseCallback物件根據返回的狀態回撥對應的onFailure和onSuccess的方法,進而回調Spark core中的業務邏輯,執行Promise/Future的done方法,上層退出阻塞。
    private[netty] case class RpcOutboxMessage(content: ByteBuffer,
                                              _onFailure: (Throwable) => Unit,
                                              _onSuccess: (TransportClient, ByteBuffer) => Unit)
           extends OutboxMessage with RpcResponseCallback with Logging {
    
       private var client: TransportClient = _
       private var requestId: Long = _
    
       override def sendWith(client: TransportClient): Unit = {
           this.client = client
           this.requestId = client.sendRpc(content, this)
       }
    
       def onTimeout(): Unit = {
           if (client != null) {
               client.removeRpcRequest(requestId)
           } else {
               logError("Ask timeout before connecting successfully")
           }
       }
    
       override def onFailure(e: Throwable): Unit = {
           _onFailure(e)
       }
    
       override def onSuccess(response: ByteBuffer): Unit = {
           _onSuccess(client, response)
       }
    
    }
  3. 關閉收件箱

    網路連線錯誤和RpcEnv的停止執行都會觸發OutBox的關閉和資源的清理,OutBox關閉的處理邏輯如下:

    • 如果connectFuture不為空,說明這會正在執行連線任務,那麼呼叫connectFuture.cancel(true)方法,將任務取消。

    • 呼叫closeClient方法,關閉客戶端,這裡僅僅將client引用置為null,但並不是真正的關閉,因為需要重用連線。

    • 呼叫nettyEnv.removeOutbox(remoteAddress)方法,從nettyEnv中移除OutBox,因此將來的訊息將會使用一個新的或原有的client連線並建立一個新的OutBox。
    • 執行所有還未處理的訊息的onFailure方法,並告知失敗的原因。

Spark RPC服務的流程

使用示例

package org.apache.spark.rpc

import org.apache.spark.rpc.netty.NettyRpcEnvFactory
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
import scala.concurrent.duration._
import scala.language.postfixOps

class SparkRPCTest extends SparkFunSuite with BeforeAndAfterAll {

    def createRpcEnv(conf: SparkConf, name: String, port: Int, clientMode: Boolean = false): RpcEnv = {
        val config = RpcEnvConfig(conf, name, "localhost", "localhost", port, new SecurityManager(conf), clientMode)
        new NettyRpcEnvFactory().create(config)
    }

    test("send a message to server from local") {
        var serverRpcEnv: RpcEnv = null
        try {
            @volatile var serverReceivedMsg: String = null
            serverRpcEnv = createRpcEnv(new SparkConf(), "server", 0)
            val serverEndpointRef = serverRpcEnv.setupEndpoint("server-endpoint", new RpcEndpoint {
                override val rpcEnv: RpcEnv = serverRpcEnv

                override def receive: PartialFunction[Any, Unit] = {
                    case msg: String => serverReceivedMsg = msg
                }
            })
            serverEndpointRef.send("hello")
            eventually(timeout(5 seconds), interval(10 millis)) {
                assert("hello" === serverReceivedMsg)
            }
        } finally {
            destory(serverRpcEnv)
        }
    }

    test("send a message to server from client") {
        var serverRpcEnv: RpcEnv = null
        var clientRpcEnv: RpcEnv = null
        try {
            @volatile var serverReceivedMsg: String = null
            serverRpcEnv = createRpcEnv(new SparkConf(), "server", 0)
            serverRpcEnv.setupEndpoint("server-endpoint", new RpcEndpoint {
                override val rpcEnv: RpcEnv = serverRpcEnv

                override def receive: PartialFunction[Any, Unit] = {
                    case msg: String => serverReceivedMsg = msg
                }
            })
            clientRpcEnv = createRpcEnv(new SparkConf(), "client", 0, clientMode = true)
            val serverEndpointRef = clientRpcEnv.setupEndpointRef(serverRpcEnv.address, "server-endpoint")
            serverEndpointRef.send("hello")
            eventually(timeout(5 seconds), interval(10 millis)) {
                assert("hello" === serverReceivedMsg)
            }
        } finally {
            destory(clientRpcEnv)
            destory(serverRpcEnv)
        }
    }

    def destory(env: RpcEnv): Unit = {
        try {
            if (env != null) {
                env.shutdown()
                env.awaitTermination()
            }
        } finally {
            super.afterAll()
        }
    }
}

伺服器啟動

伺服器響應

伺服器響應主要分為兩個階段,

  1. 第一階段,IO接收

    TransportRequestHandler是netty的回撥handler,它會將請求最終轉發給內部NettyRpcHandler的receive方法進行處理,方法內部會呼叫Dispatcher將RpcMessage投遞到對應的Inbox中,到此結束。

  2. 第二階段,IO響應

    MessageLoop會獲取待處理的RpcMessage進行處理,實際上就是呼叫RpcEndpoint的業務邏輯得到對應的結果,之後通過回撥NettyRpcCallContext.reply方法,將響應結果序列化後,再通過回撥RpcResponseCallback.onSuccess方法將序列化後的結果傳給TransportRequestHandler.this.respond方法,該方法會將執行結果封裝為RpcResponse後通過Channel傳送回客戶端。

客戶端請求

  1. 第一階段,IO傳送

    利用RpcEndpointRefsend或者ask動作。這裡以send為例,send會先進行訊息的序列化,然後投遞到指定地址的Outbox中,Outbox如果發現連線未建立則先嚐試建立連線,然後呼叫底層的TransportClient傳送資料,直接通過該netty的API完成,完成後即可返回。這裡返回了UUID作為訊息的標識,用於下一個階段的回撥,使用的角度來說可以返回一個Future,客戶端可以阻塞或者繼續做其他操作。

  2. 第二階段,IO接收

    TransportResponseHandler接收到遠端的響應後,會根據RpcResponse中的requestId獲得RpcResponseCallback,然後根據回撥對應的onSuccess方法。接著將執行結果反序列化後,回撥第一階段的Future,完成呼叫。

相關推薦

Spark RPC實現原理分析

Spark RPC模組架構圖 Spark RPC是按照MailBox的設計思路來實現的,為了能夠更直觀地表達RPC的設計,我們先從RPC架構圖來看,如下圖所示: 該圖主要描述了從客戶端向伺服器端傳送遠端訊息和在伺服器端傳送本地訊息的過程,該過程

Semaphore實現原理分析

業務需求 err java並發 裏的 eas static 默認 rem lac synchronized的語義是互斥鎖,就是在同一時刻,只有一個線程能獲得執行代碼的鎖。但是現實生活中,有好多的場景,鎖不止一把。 比如說,又到了十一假期,買票是重點,必須圈起來。在購票大廳裏

Spark之Task原理分析

finish lease finall .com 反序 eap wrap setresult add 在Spark中,一個應用程序要想被執行,肯定要經過以下的步驟: 從這個路線得知,最終一個job是依賴於分布在集群不同節點中的task,通過並行或者並發的運

php session實現原理分析

keep enc accep referer zip image time -s accept http://www.jb51.net/article/77726.htm 第一次會話時會有Set-Cookie響應頭返回,設置上PHPSESSID cookie Cache

Java原子類實現原理分析

upd hat 16px 檢查 () 過程 jvm api 處理 並發包中的原子類可以解決類似num++這樣的復合類操作的原子性問題,相比鎖機制,使用原子類更精巧輕量,性能開銷更小,下面就一起來分析下原子類的實現機理。 悲觀的解決方案(阻塞同步)   我們知道,num++看

Android系統的智能指針(輕量級指針、強指針和弱指針)的實現原理分析【轉】

其中 sin 類的定義 reason ava tab eas file 現在 Android系統的運行時庫層代碼是用C++來編寫的,用C++ 來寫代碼最容易出錯的地方就是指針了,一旦使用不當,輕則造成內存泄漏,重則造成系統崩潰。不過系統為我們提供了智能指針,避免出現上述問題

Tomcat 的 ErrorPage 實現原理分析

esp 轉發 isp ORC code compare on() sta ger https://www.cnblogs.com/softidea/p/5981766.html 使用Tomcat,一定見到過404,500的時候,見到過Tomcat提供的錯誤頁面,例如請求的資

轉:HashMap實現原理分析(面試問題:兩個hashcode相同 的對象怎麽存入hashmap的)

影響 strong 就會 怎麽 ash 地方 shm nbsp 擔心 原文地址:https://www.cnblogs.com/faunjoe88/p/7992319.html 主要內容: 1)put 疑問:如果兩個key通過hash%Entry[].length得到的

HashMap實現原理分析及簡單實現一個HashMap

HashMap實現原理分析及簡單實現一個HashMap 歡迎關注作者部落格 簡書傳送門 轉載@原文地址   HashMap的工作原理是近年來常見的Java面試題。幾乎每個Java程式設計師都知道HashMap,都知道哪裡要用HashMap,知道HashMap和

根據時間戳轉Date實現(Java)及實現原理分析

時間戳是指格林威治時間1970年01月01日00時00分00秒(北京時間1970年01月01日08時00分00秒)起至現在的總秒數。 本次實現跟根據Java.text.* 包中的工具類實現的,示例程式碼: import java.text.SimpleDateFormat; public

OkHttp3實現原理分析(二)

概述 前言:前一節https://mp.csdn.net/postedit/84941253,總結了一下OkHttp3的簡單使用教程。在專案中使用了這個網路框架,在看完基本的原始碼之後,還是想總結一下OkHttp的實現流程。在學習框架的過程中,從使用方法出發,首先是怎麼使用,其次是我們使

MySQL Order By實現原理分析和Filesort優化

. 目錄(?)[-] 在MySQL中的ORDER BY有兩種排序實現方式: 1、利用有序索引獲取有序資料 2、檔案排序 在使用explain分析查詢的時候,利用有序索引獲取有序資料顯示Using index。而檔案排序顯示Using filesort。 1.利

Java JDK 動態代理使用及實現原理分析

一、什麼是代理? 代理是一種常用的設計模式,其目的就是為其他物件提供一個代理以控制對某個物件的訪問。代理類負責為委託類預處理訊息,過濾訊息並轉發訊息,以及進行訊息被委託類執行後的後續處理。 代理模式 UML 圖: 簡單結構示意圖: 為了保持行為的一致性,代

Spark:Task原理分析

在Spark中,一個應用程式要想被執行,肯定要經過以下的步驟:          從這個路線得知,最終一個job是依賴於分佈在叢集不同節點中的task,通過並行或者併發的執行來完成真正的工作。由此可見,一個個的分散式的task才是Spark的真正執行者。下面先來張task

Guava TreeMultiSet實現原理分析(2)

5 count,size AvlNode為資料統計提供了多個便利引數,不需要遍歷所有的子節點就可以獲得相關的個數資訊。 AvlNode的統計屬性: elemCount:統計key相同的元素個數。 distinctElements:統計子樹中所有節點的個數,即ke

Ribbon使用及其客戶端負載均衡實現原理分析

1、ribbon負載均衡測試 (1)consumer工程新增依賴 <dependency> <groupId>org.springframework.cloud</groupId> <artif

Android控制元件TextView的實現原理分析

                        在前面一個系列的文章中,我們以視窗為單位,分析了WindowManagerService服務的實現。同時,在再前面一個系列的文章中,我們又分析了視窗的組成。簡單來說,視窗就是由一系列的檢視按照一定的佈局組織起來的。實際上,每一個檢視都是一個控制元件,這些控制可以

聊聊併發(六)ConcurrentLinkedQueue的實現原理分析

1.    引言 在併發程式設計中我們有時候需要使用執行緒安全的佇列。如果我們要實現一個執行緒安全的佇列有兩種實現方式一種是使用阻塞演算法,另一種是使用非阻塞演算法。使用阻塞演算法的佇列可以用一個鎖(入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實現,而非阻塞的實現方式則可以

Spring Aop 原始碼實現原理分析

更新:2018/4/2 修改字型、新增引言。0   引言AOP是Aspect Oriented Programing的簡稱,面向切面程式設計。AOP適合於那些具有橫切邏輯的應用:如效能監測,訪問控制,事務管理、快取、物件池管理以及日誌記錄。AOP將這些分散在各個業務邏輯中的程

Android訊息迴圈實現原理分析

Android訊息迴圈 在Android中,如果一個執行緒有訊息迴圈(如UI執行緒),那麼其他執行緒可以獲取它的Handler物件,使用這個Handler物件傳送訊息到訊息迴圈所在的執行緒,這個執行緒收到這個訊息後,可以做一些操作,最典型的就是子執行緒執行耗時