kafka 協議層API示例
阿新 • • 發佈:2019-01-01
眾所周知,Kafka自己實現了一套二進位制協議(binary protocol)用於各種功能的實現,比如傳送訊息,獲取訊息,提交位移以及建立topic等。具體協議規範參見:Kafka協議 這套協議的具體使用流程為:
1.客戶端建立對應協議的請求
2.客戶端傳送請求給對應的broker
3.broker處理請求,併發送response給客戶端
雖然Kafka提供的大量的指令碼工具用於各種功能的實現,但很多時候我們還是希望可以把某些功能以程式設計的方式嵌入到另一個系統中。這時使用Java API的方式就顯得異常地靈活了。本文我將嘗試給出Java API底層框架的一個範例,同時也會針對“建立topic”和“檢視位移”這兩個主要功能給出對應的例子。 需要提前說明的是,本文給出的範例並沒有考慮Kafka叢集開啟安全的情況。另外Kafka的KIP4應該一直在優化命令列工具以及各種管理操作,有興趣的讀者可以關注這個KIP。
本文中用到的API依賴於kafka-clients,所以如果你使用Maven構建的話,請加上:
1 2 3 4 5 |
<
dependency
>
<
groupId
>org.apache.kafka</
groupId
>
<
artifactId
>kafka-clients</
artifactId
>
<
version
>0.10.2.0</ version
>
</
dependency
>
|
如果是gradle,請加上:
?1 |
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.2.0'
|
底層框架
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
/**
* 傳送請求主方法
* @param host 目標broker的主機名
* @param port 目標broker的埠
* @param request 請求物件
* @param apiKey 請求型別
* @return 序列化後的response
* @throws IOException
*/
public
ByteBuffer send(String host,
int
port, AbstractRequest request, ApiKeys apiKey)
throws
IOException {
Socket socket = connect(host, port);
try
{
return
send(request, apiKey, socket);
}
finally
{
socket.close();
}
}
/**
* 傳送序列化請求並等待response返回
* @param socket 連向目標broker的socket
* @param request 序列化後的請求
* @return 序列化後的response
* @throws IOException
*/
private
byte
[] issueRequestAndWaitForResponse(Socket socket,
byte
[] request)
throws
IOException {
sendRequest(socket, request);
return
getResponse(socket);
}
/**
* 傳送序列化請求給socket
* @param socket 連向目標broker的socket
* @param request 序列化後的請求
* @throws IOException
*/
private
void
sendRequest(Socket socket,
byte
[] request)
throws
IOException {
DataOutputStream dos =
new
DataOutputStream(socket.getOutputStream());
dos.writeInt(request.length);
dos.write(request);
dos.flush();
}
/**
* 從給定socket處獲取response
* @param socket 連向目標broker的socket
* @return 獲取到的序列化後的response
* @throws IOException
*/
private
byte
[] getResponse(Socket socket)
throws
IOException {
DataInputStream dis =
null
;
try
{
dis =
new
DataInputStream(socket.getInputStream());
byte
[] response =
new
byte
[dis.readInt()];
dis.readFully(response);
return
response;
}
finally
{
if
(dis !=
null
) {
dis.close();
}
}
}
/**
* 建立Socket連線
* @param hostName 目標broker主機名
* @param port 目標broker服務埠, 比如9092
* @return 建立的Socket連線
* @throws IOException
*/
private
Socket connect(String hostName,
int
port)
throws
IOException {
return
new
Socket(hostName, port);
}
/**
* 向給定socket傳送請求
* @param request 請求物件
* @param apiKey 請求型別, 即屬於哪種請求
* @param socket 連向目標broker的socket
* @return 序列化後的response
* @throws IOException
*/
private
ByteBuffer send(AbstractRequest request, ApiKeys apiKey, Socket socket)
throws
IOException {
RequestHeader header =
new
RequestHeader(apiKey.id, request.version(),
"client-id"
,
0
);
ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf());
header.writeTo(buffer);
request.writeTo(buffer);
byte
[] serializedRequest = buffer.array();
byte
[] response = issueRequestAndWaitForResponse(socket, serializedRequest);
ByteBuffer responseBuffer = ByteBuffer.wrap(response);
ResponseHeader.parse(responseBuffer);
return
responseBuffer;
}
|
有了這些方法的鋪墊,我們就可以建立具體的請求了。
建立topic
?1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
/**
* 建立topic
* 由於只是樣例程式碼,有些東西就硬編碼寫到程式裡面了(比如主機名和埠),各位看官自行修改即可
* @param topicName topic名
* @param partitions 分割槽數
* @param replicationFactor 副本數
* @throws IOException
*/
public
void
createTopics(String topicName,
int
partitions,
short
replicationFactor)
throws
IOException {
Map<String, CreateTopicsRequest.TopicDetails> topics =
new
HashMap<>();
// 插入多個元素便可同時建立多個topic
topics.put(topicName,
new
CreateTopicsRequest.TopicDetails(partitions, replicationFactor));
int
creationTimeoutMs =
60000
;
CreateTopicsRequest request =
new
CreateTopicsRequest.Builder(topics, creationTimeoutMs).build();
ByteBuffer response = send(
"localhost"
,
9092
, request, ApiKeys.CREATE_TOPICS);
CreateTopicsResponse.parse(response, request.version());
}
|
檢視位移
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
/**
* 獲取某個consumer group下的某個topic分割槽的位移
* @param groupID group id
* @param topic topic名
* @param parititon 分割槽號
* @throws IOException
*/
public
void
getOffsetForPartition(String groupID, String topic,
int
parititon)
throws
IOException {
TopicPartition tp =
new
TopicPartition(topic, parititon);
OffsetFetchRequest request =
new
OffsetFetchRequest.Builder(groupID, singletonList(tp))
.setVersion((
short
)
2
).build();
ByteBuffer response = send(
"localhost"
,
9092
, request, ApiKeys.OFFSET_FETCH);
OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
OffsetFetchResponse.PartitionData partitionData = resp.responseData().get(tp);
System.out.println(partitionData.offset);
}
|
1 2 3 4 5 6 7 8 9 10 11 12 |
/**
* 獲取某個consumer group下所有topic分割槽的位移資訊
* @param groupID group id
* @return (topic分割槽 --> 分割槽資訊)的map
* @throws IOException
*/
public
Map<TopicPartition, OffsetFetchResponse.PartitionData> getAllOffsetsForGroup(String groupID)
throws
IOException {
OffsetFetchRequest request =
new
OffsetFetchRequest.Builder(groupID,
null
).setVersion((
short
)
2
).build();
ByteBuffer response = send(
"localhost"
,
9092
, request, ApiKeys.OFFSET_FETCH);
OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
return
resp.responseData();
}
|