【Kafka原始碼】處理請求
阿新 • • 發佈:2019-01-06
在KafkaServer中的入口在:
apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator,
kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
首先根據相關引數,例項化KafkaApis,然後例項化KafkaRequestHandlerPool。下面我們首先看下KafkaRequestHandlerPool。
一、KafkaRequestHandlerPool
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
numThreads: Int) extends Logging with KafkaMetricsGroup {
/* a meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
val threads = new Array[Thread](numThreads)
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
threads(i).start()
}
//...
}
主要是啟動了numThreads個數的執行緒,然後執行緒中執行的內容是KafkaRequestHandler。
/**
* 響應kafka請求的執行緒
*/
class KafkaRequestHandler(id: Int,
brokerId: Int,
val aggregateIdleMeter: Meter,
val totalHandlerThreads: Int,
val requestChannel: RequestChannel,
apis: KafkaApis) extends Runnable with Logging {
this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
def run() {
while(true) {
try {
var req : RequestChannel.Request = null
while (req == null) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = SystemTime.nanoseconds
req = requestChannel.receiveRequest(300)
val idleTime = SystemTime.nanoseconds - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
}
if(req eq RequestChannel.AllDone) {
debug("Kafka request handler %d on broker %d received shut down command".format(
id, brokerId))
return
}
req.requestDequeueTimeMs = SystemTime.milliseconds
trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
apis.handle(req)//這邊是如何處理請求的重點
} catch {
case e: Throwable => error("Exception when handling request", e)
}
}
}
//shutdown。。
}
在run方法中,我們可以看到,主要處理訊息的地方是api.handle(req)。下面我們主要看下這塊的內容。
二、KafkaApis.handle
直接看程式碼:
/**
* Top-level method that handles all requests and multiplexes to the right api
*/
def handle(request: RequestChannel.Request) {
try {
trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
ApiKeys.forId(request.requestId) match {//根據requestId,呼叫不同的方法,處理不同的請求
case ApiKeys.PRODUCE => handleProducerRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
case e: Throwable =>
if (request.requestObj != null) {
request.requestObj.handleError(e, requestChannel, request)
error("Error when handling request %s".format(request.requestObj), e)
} else {
val response = request.body.getErrorResponse(request.header.apiVersion, e)
val respHeader = new ResponseHeader(request.header.correlationId)
/* If request doesn't have a default error response, we just close the connection.
For example, when produce request has acks set to 0 */
if (response == null)
requestChannel.closeConnection(request.processor, request)
else
requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response)))
error("Error when handling request %s".format(request.body), e)
}
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
2.1 ApiKeys列舉類
PRODUCE(0, "Produce"),//生產者訊息
FETCH(1, "Fetch"),//消費者獲取訊息
LIST_OFFSETS(2, "Offsets"),//獲取偏移量
METADATA(3, "Metadata"),//獲取topic源資料
LEADER_AND_ISR(4, "LeaderAndIsr"),
STOP_REPLICA(5, "StopReplica"),//停止副本複製
UPDATE_METADATA_KEY(6, "UpdateMetadata"),//更新源資料
CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),//controller停止
OFFSET_COMMIT(8, "OffsetCommit"),//提交offset
OFFSET_FETCH(9, "OffsetFetch"),//獲取offset
GROUP_COORDINATOR(10, "GroupCoordinator"),//組協調
JOIN_GROUP(11, "JoinGroup"),//加入組
HEARTBEAT(12, "Heartbeat"),//心跳
LEAVE_GROUP(13, "LeaveGroup"),//離開組
SYNC_GROUP(14, "SyncGroup"),//同步組
DESCRIBE_GROUPS(15, "DescribeGroups"),//描述組
LIST_GROUPS(16, "ListGroups"),//列出組
SASL_HANDSHAKE(17, "SaslHandshake"),//加密握手
API_VERSIONS(18, "ApiVersions");//版本
這塊比較簡單,主要的是Request的資料結構,還有後續的處理方法。下面我們逐步來分析。
三、Request資料結構
所有的請求,最終都會變成這個RequestChannel.Request。所以我們先看下這個Request。
case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) {
//...
val requestId = buffer.getShort()
private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]=
Map(ApiKeys.FETCH.id -> FetchRequest.readFrom,
ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom
)
val requestObj =
keyToNameAndDeserializerMap.get(requestId).map(readFrom => readFrom(buffer)).orNull
val header: RequestHeader =
if (requestObj == null) {
buffer.rewind
try RequestHeader.parse(buffer)
catch {
case ex: Throwable =>
throw new InvalidRequestException(s"Error parsing request header. Our best guess of the apiKey is: $requestId", ex)
}
} else
null
val body: AbstractRequest =
if (requestObj == null)
try {
// For unsupported version of ApiVersionsRequest, create a dummy request to enable an error response to be returned later
if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion))
new ApiVersionsRequest
else
AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
} catch {
case ex: Throwable =>
throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex)
}
else
null
buffer = null
private val requestLogger = Logger.getLogger("kafka.request.logger")
def requestDesc(details: Boolean): String = {
if (requestObj != null)
requestObj.describe(details)
else
header.toString + " -- " + body.toString
}
//...
}
主要有幾個部分,
- 首先是requestId,是一個short型別的值。
- 然後是header,即訊息頭,是一個RequestHeader
- 最後是body,是訊息的內容,型別為AbstractRequest
3.1 requestId
這個requestId表示的是api的型別,KafkaApis需要根據這個requestId,來判斷呼叫哪個方法處理訊息。
3.2 header
我們看下RequestHeader的結構。
private final short apiKey;
private final short apiVersion;
private final String clientId;
private final int correlationId;
主要是四個變數,apiKey,APIVersion,clientId,correlationId。
3.3 body
訊息體,對應的類為AbstractRequest。主要的內容是根據版本號和apiKey來解析出訊息的具體內容。
public static AbstractRequest getRequest(int requestId, int versionId, ByteBuffer buffer) {
ApiKeys apiKey = ApiKeys.forId(requestId);
switch (apiKey) {
case PRODUCE:
return ProduceRequest.parse(buffer, versionId);
case FETCH:
return FetchRequest.parse(buffer, versionId);
case LIST_OFFSETS:
return ListOffsetRequest.parse(buffer, versionId);
case METADATA:
return MetadataRequest.parse(buffer, versionId);
case OFFSET_COMMIT:
return OffsetCommitRequest.parse(buffer, versionId);
case OFFSET_FETCH:
return OffsetFetchRequest.parse(buffer, versionId);
case GROUP_COORDINATOR:
return GroupCoordinatorRequest.parse(buffer, versionId);
case JOIN_GROUP:
return JoinGroupRequest.parse(buffer, versionId);
case HEARTBEAT:
return HeartbeatRequest.parse(buffer, versionId);
case LEAVE_GROUP:
return LeaveGroupRequest.parse(buffer, versionId);
case SYNC_GROUP:
return SyncGroupRequest.parse(buffer, versionId);
case STOP_REPLICA:
return StopReplicaRequest.parse(buffer, versionId);
case CONTROLLED_SHUTDOWN_KEY:
return ControlledShutdownRequest.parse(buffer, versionId);
case UPDATE_METADATA_KEY:
return UpdateMetadataRequest.parse(buffer, versionId);
case LEADER_AND_ISR:
return LeaderAndIsrRequest.parse(buffer, versionId);
case DESCRIBE_GROUPS:
return DescribeGroupsRequest.parse(buffer, versionId);
case LIST_GROUPS:
return ListGroupsRequest.parse(buffer, versionId);
case SASL_HANDSHAKE:
return SaslHandshakeRequest.parse(buffer, versionId);
case API_VERSIONS:
return ApiVersionsRequest.parse(buffer, versionId);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
"code should be updated to do so.", apiKey));
}
}
這塊的請求型別很多,想要了解具體結構的,可以到每個類中具體看。