Spark2.0 shuffle service
阿新 • • 發佈:2018-06-15
lec promise 接口 釋放 failure blocks string end sts
Spark 的shuffle 服務是spark的核心,本文介紹了非ExternalShuffleClient的方式,看BlockService的整個架構。ShuffleClient是整個框架的基礎,有init方法和fetchBlock兩個方法。
/** Provides an interface for reading shuffle files, either from an Executor or external service. */
public abstract class ShuffleClient implements Closeable {
/**
* Initializes the ShuffleClient, specifying this Executor‘s appId.
* Must be called before any other method on the ShuffleClient.
* 初始化ShuffleClient, 傳入本執行器的程序ID,本方法必須在訪問ShuffleClient的其它方法前調用。
*/
public void init(String appId) { }
/**
* Fetch a sequence of blocks from a remote node asynchronously,
*
* Note that this API takes a sequence so the implementation can batch requests, and does not
* return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as
* the data of a block is fetched, rather than waiting for all blocks to be fetched.
* 異步的從遠程結點取一系列的數據塊,並且不返回future對象,所以當取到一個數據塊的數據時,底層的實現可以調用onBlockFetchSuccess方法,
* 並不用等所有的數據塊都取完。
*/
public abstract void fetchBlocks(
String host,
int port,
String execId,
String[] blockIds,
BlockFetchingListener listener);
}
BlockFetchingListener接口,onBlockFetchSuccess方法:每次成功取得一個數據塊時調用。當本方法返回時,數據必須被自動釋放。 如果數據被傳遞給另一個線程,接收者必須自己調用retain()和release(),或者拷貝數據到一個新的緩沖區。onBlockFetchFailure方法,數據塊獲取失敗時,至少被調用一次。
public interface BlockFetchingListener extends EventListener {
/**
* Called once per successfully fetched block. After this call returns, data will be released
* automatically. If the data will be passed to another thread, the receiver should retain()
* and release() the buffer on their own, or copy the data to a new buffer.
*/
void onBlockFetchSuccess(String blockId, ManagedBuffer data);
/**
* Called at least once per block upon failures.
*/
void onBlockFetchFailure(String blockId, Throwable exception);
}
BlockTransferService擴展了ShuffleClient,有一些方法的公共的實現。
private[spark]
abstract class BlockTransferService extends ShuffleClient with Closeable with Logging {
/**
* Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
* local blocks or put local blocks.
* 通過傳遞給他BlockDataManager對象來初始化傳輸服務,BlockDataManager可以用來存取本地數據塊。
*/
def init(blockDataManager: BlockDataManager): Unit
/**
* Tear down the transfer service.
* 關閉傳輸服務。
*/
def close(): Unit
/**
* Port number the service is listening on, available only after [[init]] is invoked.
* 傳輸服務所在的端口號,在調用init方法後可用。
*/
def port: Int
/**
* Host name the service is listening on, available only after [[init]] is invoked.
* 傳輸服務所在的主機名,在調用init方法後可用。
*/
def hostName: String
/**
* Fetch a sequence of blocks from a remote node asynchronously,
* available only after [[init]] is invoked.
*
* Note that this API takes a sequence so the implementation can batch requests, and does not
* return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as
* the data of a block is fetched, rather than waiting for all blocks to be fetched.
*
* 異步的從遠程結點取一系列的數據塊,,僅在調用init方法後可用。
* 註意本API用一個序列,所以實現可以使用批量請求,並且不返回future對象,所以當取到一個數據塊的數據時,底層的實現可以調用onBlockFetchSuccess方法,
* 並不用等所有的數據塊都取完。
*/
override def fetchBlocks( host: String, port: Int, execId: String, blockIds: Array[String], listener: BlockFetchingListener): Unit
/**
* Upload a single block to a remote node, available only after [[init]] is invoked.
* 上傳一個數據塊到遠程結點,僅在調用init方法後可用。
*/
def uploadBlock(
hostname: String,
port: Int,
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Future[Unit]
/**
* A special case of [[fetchBlocks]], as it fetches only one block and is blocking.
*
* It is also only available after [[init]] is invoked.
* fetchBlocks的一個特別方法,他只取一個數據塊並且阻塞,僅在調用init方法後可用。
。
*/
def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = {
// A monitor for the thread to wait on.
val result = Promise[ManagedBuffer]()
fetchBlocks(host, port, execId, Array(blockId),
new BlockFetchingListener {
override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
result.failure(exception)
}
override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
val ret = ByteBuffer.allocate(data.size.toInt)
ret.put(data.nioByteBuffer())
ret.flip()
result.success(new NioManagedBuffer(ret))
}
})
ThreadUtils.awaitResult(result.future, Duration.Inf)
}
/**
* Upload a single block to a remote node, available only after [[init]] is invoked.
*
* This method is similar to [[uploadBlock]], except this one blocks the thread
* until the upload finishes.
* 上傳一個數據塊到遠程結點,僅在調用init方法後可用。
* 這個方法和uploadBlock方法類似,除了直到上傳結點,本方法會一直阻塞。
*/
def uploadBlockSync(
hostname: String,
port: Int,
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Unit = {
val future = uploadBlock(hostname, port, execId, blockId, blockData, level, classTag)
ThreadUtils.awaitResult(future, Duration.Inf)
}
}
NettyBlockTransferService擴展了BlockTransferServie
Spark2.0 shuffle service